// Copyright Epic Games, Inc. All Rights Reserved.

#define _SILENCE_CXX17_C_HEADER_DEPRECATION_WARNING

#include <zenbase/refcount.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compress.h>
#include <zencore/except.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/testutils.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
#include <zencore/xxhash.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/zenhttp.h>
#include <zenutil/cache/cache.h>
#include <zenutil/cache/cacherequests.h>
#include <zenutil/chunkrequests.h>
#include <zenutil/logging/testformatter.h>
#include <zenutil/packageformat.h>
#include <zenutil/zenserverprocess.h>

#include <http_parser.h>

#if ZEN_PLATFORM_WINDOWS
#	pragma comment(lib, "Crypt32.lib")
#	pragma comment(lib, "Wldap32.lib")
#endif

ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
#undef GetObject
ZEN_THIRD_PARTY_INCLUDES_END

#include <atomic>
#include <filesystem>
#include <map>
#include <random>
#include <span>
#include <thread>
#include <typeindex>
#include <unordered_map>

#if ZEN_PLATFORM_WINDOWS
#	include <ppl.h>
#	include <process.h>
#endif

#if ZEN_USE_MIMALLOC
ZEN_THIRD_PARTY_INCLUDES_START
#	include <mimalloc-new-delete.h>
ZEN_THIRD_PARTY_INCLUDES_END
#endif

//////////////////////////////////////////////////////////////////////////

#include "projectclient.h"

//////////////////////////////////////////////////////////////////////////

#if ZEN_WITH_TESTS
#	define ZEN_TEST_WITH_RUNNER 1
#	include <zencore/testing.h>
#	include <zencore/workthreadpool.h>
#endif

using namespace std::literals;

#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
struct Concurrency
{
	template<typename... T>
	static void parallel_invoke(T&&... t)
	{
		constexpr size_t NumTs			= sizeof...(t);
		std::thread		 Threads[NumTs] = {
			 std::thread(std::forward<T>(t))...,
		 };

		for (std::thread& Thread : Threads)
		{
			Thread.join();
		}
	}
};
#endif

//////////////////////////////////////////////////////////////////////////

#if ZEN_WITH_TESTS
zen::ZenServerEnvironment TestEnv;

int
main(int argc, char** argv)
{
#	if ZEN_USE_MIMALLOC
	mi_version();
#	endif
	using namespace std::literals;
	using namespace zen;

	zen::logging::InitializeLogging();

	zen::logging::SetLogLevel(zen::logging::level::Debug);
	spdlog::set_formatter(std::make_unique<zen::logging::full_test_formatter>("test", std::chrono::system_clock::now()));

	std::filesystem::path ProgramBaseDir = GetRunningExecutablePath().parent_path();
	std::filesystem::path TestBaseDir	 = std::filesystem::current_path() / ".test";

	// This is pretty janky because we're passing most of the options through to the test
	// framework, so we can't just use cxxopts (I think). This should ideally be cleaned up
	// somehow in the future

	std::string ServerClass;

	for (int i = 1; i < argc; ++i)
	{
		if (argv[i] == "--http"sv)
		{
			if ((i + 1) < argc)
			{
				ServerClass = argv[++i];
			}
		}
	}

	TestEnv.InitializeForTest(ProgramBaseDir, TestBaseDir, ServerClass);

	ZEN_INFO("Running tests...(base dir: '{}')", TestBaseDir);

	zen::testing::TestRunner Runner;
	Runner.ApplyCommandLine(argc, argv);

	return Runner.Run();
}

namespace zen::tests {

TEST_CASE("default.single")
{
	std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
	ZenServerInstance	  Instance(TestEnv);
	Instance.SetTestDir(TestDir);
	const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady();

	std::atomic<uint64_t> RequestCount{0};
	std::atomic<uint64_t> BatchCounter{0};

	ZEN_INFO("Running single server test...");

	auto IssueTestRequests = [&] {
		const uint64_t BatchNo	= BatchCounter.fetch_add(1);
		const int	   ThreadId = zen::GetCurrentThreadId();

		ZEN_INFO("query batch {} started (thread {})", BatchNo, ThreadId);
		cpr::Session cli;
		cli.SetUrl(cpr::Url{fmt::format("http://localhost:{}/test/hello", PortNumber)});

		for (int i = 0; i < 10000; ++i)
		{
			auto res = cli.Get();
			++RequestCount;
		}
		ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId);
	};

	zen::Stopwatch timer;

	Concurrency::parallel_invoke(IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests,
								 IssueTestRequests);

	uint64_t Elapsed = timer.GetElapsedTimeMs();

	ZEN_INFO("{} requests in {} ({})",
			 RequestCount.load(),
			 zen::NiceTimeSpanMs(Elapsed),
			 zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
}

TEST_CASE("default.loopback")
{
	std::filesystem::path TestDir = TestEnv.CreateNewTestDir();

	ZenServerInstance Instance(TestEnv);
	Instance.SetTestDir(TestDir);
	const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--http-forceloopback");

	ZEN_INFO("Running loopback server test...");

	SUBCASE("ipv4 endpoint connectivity")
	{
		cpr::Session cli;
		cli.SetUrl(cpr::Url{fmt::format("http://127.0.0.1:{}/test/hello", PortNumber)});
		auto res = cli.Get();
		CHECK(!res.error);
	}

	SUBCASE("ipv6 endpoint connectivity")
	{
		cpr::Session cli;
		cli.SetUrl(cpr::Url{fmt::format("http://[::1]:{}/test/hello", PortNumber)});
		auto res = cli.Get();
		CHECK(!res.error);
	}
}

TEST_CASE("multi.basic")
{
	ZenServerInstance	  Instance1(TestEnv);
	std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
	Instance1.SetTestDir(TestDir1);
	Instance1.SpawnServer();

	ZenServerInstance	  Instance2(TestEnv);
	std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
	Instance2.SetTestDir(TestDir2);
	Instance2.SpawnServer();

	ZEN_INFO("Waiting...");

	const uint16_t PortNum1 = Instance1.WaitUntilReady();
	CHECK_MESSAGE(PortNum1 != 0, Instance1.GetLogOutput());
	const uint16_t PortNum2 = Instance2.WaitUntilReady();
	CHECK_MESSAGE(PortNum2 != 0, Instance1.GetLogOutput());

	std::atomic<uint64_t> RequestCount{0};
	std::atomic<uint64_t> BatchCounter{0};

	auto IssueTestRequests = [&](int PortNumber) {
		const uint64_t BatchNo	= BatchCounter.fetch_add(1);
		const int	   ThreadId = zen::GetCurrentThreadId();

		ZEN_INFO("query batch {} started (thread {}) for port {}", BatchNo, ThreadId, PortNumber);

		cpr::Session cli;
		cli.SetUrl(cpr::Url{fmt::format("http://localhost:{}/test/hello", PortNumber)});

		for (int i = 0; i < 10000; ++i)
		{
			auto res = cli.Get();
			++RequestCount;
		}
		ZEN_INFO("query batch {} ended (thread {})", BatchNo, ThreadId);
	};

	zen::Stopwatch timer;

	ZEN_INFO("Running multi-server test...");

	Concurrency::parallel_invoke([&] { IssueTestRequests(PortNum1); },
								 [&] { IssueTestRequests(PortNum2); },
								 [&] { IssueTestRequests(PortNum1); },
								 [&] { IssueTestRequests(PortNum2); });

	uint64_t Elapsed = timer.GetElapsedTimeMs();

	ZEN_INFO("{} requests in {} ({})",
			 RequestCount.load(),
			 zen::NiceTimeSpanMs(Elapsed),
			 zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
}

TEST_CASE("project.basic")
{
	using namespace std::literals;

	std::filesystem::path TestDir = TestEnv.CreateNewTestDir();

	ZenServerInstance Instance1(TestEnv);
	Instance1.SetTestDir(TestDir);

	const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();

	std::atomic<uint64_t> RequestCount{0};

	zen::Stopwatch timer;

	std::mt19937_64 mt;

	zen::StringBuilder<64> BaseUri;
	BaseUri << fmt::format("http://localhost:{}/prj/test", PortNumber);

	std::filesystem::path BinPath  = zen::GetRunningExecutablePath();
	std::filesystem::path RootPath = BinPath.parent_path().parent_path();
	BinPath						   = BinPath.lexically_relative(RootPath);

	SUBCASE("build store init")
	{
		{
			{
				zen::CbObjectWriter Body;
				Body << "id"
					 << "test";
				Body << "root" << RootPath.c_str();
				Body << "project"
					 << "/zooom";
				Body << "engine"
					 << "/zooom";

				zen::BinaryWriter MemOut;
				Body.Save(MemOut);

				auto Response = cpr::Post(cpr::Url{BaseUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
				CHECK(Response.status_code == 201);
			}

			{
				auto Response = cpr::Get(cpr::Url{BaseUri.c_str()});
				CHECK(Response.status_code == 200);

				zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView();

				CHECK(ResponseObject["id"].AsString() == "test"sv);
				CHECK(ResponseObject["root"].AsString() == PathToUtf8(RootPath.c_str()));
			}
		}

		BaseUri << "/oplog/foobar";

		{
			{
				zen::StringBuilder<64> PostUri;
				PostUri << BaseUri;
				auto Response = cpr::Post(cpr::Url{PostUri.c_str()});
				CHECK(Response.status_code == 201);
			}

			{
				auto Response = cpr::Get(cpr::Url{BaseUri.c_str()});
				CHECK(Response.status_code == 200);

				zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView();

				CHECK(ResponseObject["id"].AsString() == "foobar"sv);
				CHECK(ResponseObject["project"].AsString() == "test"sv);
			}
		}

		SUBCASE("build store persistence")
		{
			uint8_t AttachData[] = {1, 2, 3};

			zen::CompressedBuffer Attachment = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone(zen::MemoryView{AttachData, 3}));
			zen::CbAttachment	  Attach{Attachment, Attachment.DecodeRawHash()};

			zen::CbObjectWriter OpWriter;
			OpWriter << "key"
					 << "foo"
					 << "attachment" << Attach;

			const std::string_view ChunkId{
				"00000000"
				"00000000"
				"00010000"};
			auto FileOid = zen::Oid::FromHexString(ChunkId);

			OpWriter.BeginArray("files");
			OpWriter.BeginObject();
			OpWriter << "id" << FileOid;
			OpWriter << "clientpath"
					 << "/{engine}/client/side/path";
			OpWriter << "serverpath" << BinPath.c_str();
			OpWriter.EndObject();
			OpWriter.EndArray();

			zen::CbObject Op = OpWriter.Save();

			zen::CbPackage OpPackage(Op);
			OpPackage.AddAttachment(Attach);

			zen::BinaryWriter MemOut;
			legacy::SaveCbPackage(OpPackage, MemOut);

			{
				zen::StringBuilder<64> PostUri;
				PostUri << BaseUri << "/new";
				auto Response = cpr::Post(cpr::Url{PostUri.c_str()}, cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});

				REQUIRE(!Response.error);
				CHECK(Response.status_code == 201);
			}

			// Read file data

			{
				zen::StringBuilder<128> ChunkGetUri;
				ChunkGetUri << BaseUri << "/" << ChunkId;
				auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});

				REQUIRE(!Response.error);
				CHECK(Response.status_code == 200);
			}

			{
				zen::StringBuilder<128> ChunkGetUri;
				ChunkGetUri << BaseUri << "/" << ChunkId << "?offset=1&size=10";
				auto Response = cpr::Get(cpr::Url{ChunkGetUri.c_str()});

				REQUIRE(!Response.error);
				CHECK(Response.status_code == 200);
				CHECK(Response.text.size() == 10);
			}

			ZEN_INFO("+++++++");
		}
		SUBCASE("build store op commit") { ZEN_INFO("-------"); }
		SUBCASE("test chunk not found error")
		{
			for (size_t I = 0; I < 65; I++)
			{
				zen::StringBuilder<128> PostUri;
				PostUri << BaseUri << "/f77c781846caead318084604/info";
				auto Response = cpr::Get(cpr::Url{PostUri.c_str()});

				REQUIRE(!Response.error);
				CHECK(Response.status_code == 404);
			}
		}
	}

	const uint64_t Elapsed = timer.GetElapsedTimeMs();

	ZEN_INFO("{} requests in {} ({})",
			 RequestCount.load(),
			 zen::NiceTimeSpanMs(Elapsed),
			 zen::NiceRate(RequestCount, (uint32_t)Elapsed, "req"));
}

namespace utils {

	struct ZenConfig
	{
		std::filesystem::path DataDir;
		uint16_t			  Port;
		std::string			  BaseUri;
		std::string			  Args;

		static ZenConfig New(std::string Args = "")
		{
			return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = TestEnv.GetNewPortNumber(), .Args = std::move(Args)};
		}

		static ZenConfig New(uint16_t Port, std::string Args = "")
		{
			return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), .Port = Port, .Args = std::move(Args)};
		}

		static ZenConfig NewWithUpstream(uint16_t Port, uint16_t UpstreamPort)
		{
			return New(Port, fmt::format("--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", UpstreamPort));
		}

		static ZenConfig NewWithThreadedUpstreams(uint16_t NewPort, std::span<uint16_t> UpstreamPorts, bool Debug)
		{
			std::string Args = Debug ? "--debug" : "";
			for (uint16_t Port : UpstreamPorts)
			{
				Args = fmt::format("{}{}--upstream-zen-url=http://localhost:{}", Args, Args.length() > 0 ? " " : "", Port);
			}
			return New(NewPort, Args);
		}

		void Spawn(ZenServerInstance& Inst)
		{
			Inst.SetTestDir(DataDir);
			Inst.SpawnServer(Port, Args);
			const uint16_t InstancePort = Inst.WaitUntilReady();
			CHECK_MESSAGE(InstancePort != 0, Inst.GetLogOutput());

			if (Port != InstancePort)
				ZEN_DEBUG("relocation detected from {} to {}", Port, InstancePort);

			Port	= InstancePort;
			BaseUri = fmt::format("http://localhost:{}/z$", Port);
		}
	};

	void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg) { Cfg.Spawn(Server); }

	CompressedBuffer CreateSemiRandomBlob(size_t AttachmentSize, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast)
	{
		// Convoluted way to get a compressed buffer whose result it large enough to be a separate file
		// but also does actually compress
		const size_t			  PartCount = (AttachmentSize / (1u * 1024u * 64)) + 1;
		const size_t			  PartSize	= AttachmentSize / PartCount;
		auto					  Part		= SharedBuffer(CreateRandomBlob(PartSize));
		std::vector<SharedBuffer> Parts(PartCount, Part);
		size_t					  RemainPartSize = AttachmentSize - (PartSize * PartCount);
		if (RemainPartSize > 0)
		{
			Parts.push_back(SharedBuffer(CreateRandomBlob(RemainPartSize)));
		}
		CompressedBuffer Value = CompressedBuffer::Compress(CompositeBuffer(std::move(Parts)), OodleCompressor::Mermaid, CompressionLevel);
		return Value;
	};

	std::vector<std::pair<Oid, CompressedBuffer>> CreateAttachments(const std::span<const size_t>& Sizes)
	{
		std::vector<std::pair<Oid, CompressedBuffer>> Result;
		Result.reserve(Sizes.size());
		for (size_t Size : Sizes)
		{
			CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(CreateRandomBlob(Size)));
			Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
		}
		return Result;
	}

	std::vector<std::pair<Oid, CompressedBuffer>> CreateSemiRandomAttachments(const std::span<const size_t>& Sizes)
	{
		std::vector<std::pair<Oid, CompressedBuffer>> Result;
		Result.reserve(Sizes.size());
		for (size_t Size : Sizes)
		{
			CompressedBuffer Compressed =
				CreateSemiRandomBlob(Size, Size > 1024u * 1024u ? OodleCompressionLevel::None : OodleCompressionLevel::VeryFast);
			Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
		}
		return Result;
	}

}  // namespace utils

TEST_CASE("zcache.basic")
{
	using namespace std::literals;

	std::filesystem::path TestDir = TestEnv.CreateNewTestDir();

	const int kIterationCount = 100;

	auto HashKey = [](int i) -> zen::IoHash { return zen::IoHash::HashBuffer(&i, sizeof i); };

	{
		ZenServerInstance Instance1(TestEnv);
		Instance1.SetTestDir(TestDir);

		const uint16_t	  PortNumber = Instance1.SpawnServerAndWaitUntilReady();
		const std::string BaseUri	 = fmt::format("http://localhost:{}/z$", PortNumber);

		// Populate with some simple data

		for (int i = 0; i < kIterationCount; ++i)
		{
			zen::CbObjectWriter Cbo;
			Cbo << "index" << i;

			zen::BinaryWriter MemOut;
			Cbo.Save(MemOut);

			zen::IoHash Key = HashKey(i);

			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)},
											cpr::Body{(const char*)MemOut.Data(), MemOut.Size()},
											cpr::Header{{"Content-Type", "application/x-ue-cb"}});

			CHECK(Result.status_code == 201);
		}

		// Retrieve data

		for (int i = 0; i < kIterationCount; ++i)
		{
			zen::IoHash Key = zen::IoHash::HashBuffer(&i, sizeof i);

			cpr::Response Result =
				cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});

			CHECK(Result.status_code == 200);
		}

		// Ensure bad bucket identifiers are rejected

		{
			zen::CbObjectWriter Cbo;
			Cbo << "index" << 42;

			zen::BinaryWriter MemOut;
			Cbo.Save(MemOut);

			zen::IoHash Key = HashKey(442);

			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "te!st", Key)},
											cpr::Body{(const char*)MemOut.Data(), MemOut.Size()},
											cpr::Header{{"Content-Type", "application/x-ue-cb"}});

			CHECK(Result.status_code == 400);
		}
	}

	// Verify that the data persists between process runs (the previous server has exited at this point)

	{
		ZenServerInstance Instance1(TestEnv);
		Instance1.SetTestDir(TestDir);
		const uint16_t PortNumber = Instance1.SpawnServerAndWaitUntilReady();

		const std::string BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);

		// Retrieve data again

		for (int i = 0; i < kIterationCount; ++i)
		{
			zen::IoHash Key = HashKey(i);

			cpr::Response Result =
				cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, "test", Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});

			CHECK(Result.status_code == 200);
		}
	}
}

TEST_CASE("zcache.cbpackage")
{
	using namespace std::literals;

	auto CreateTestPackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
		auto Data			= zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
		auto CompressedData = zen::CompressedBuffer::Compress(Data);

		OutAttachmentKey = CompressedData.DecodeRawHash();

		zen::CbWriter Obj;
		Obj.BeginObject("obj"sv);
		Obj.AddBinaryAttachment("data", OutAttachmentKey);
		Obj.EndObject();

		zen::CbPackage Package;
		Package.SetObject(Obj.Save().AsObject());
		Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));

		return Package;
	};

	auto SerializeToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
		zen::BinaryWriter MemStream;

		Package.Save(MemStream);

		return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
	};

	auto IsEqual = [](zen::CbPackage Lhs, zen::CbPackage Rhs) -> bool {
		std::span<const zen::CbAttachment> LhsAttachments = Lhs.GetAttachments();
		std::span<const zen::CbAttachment> RhsAttachments = Rhs.GetAttachments();

		if (LhsAttachments.size() != RhsAttachments.size())
		{
			return false;
		}

		for (const zen::CbAttachment& LhsAttachment : LhsAttachments)
		{
			const zen::CbAttachment* RhsAttachment = Rhs.FindAttachment(LhsAttachment.GetHash());
			CHECK(RhsAttachment);

			zen::SharedBuffer LhsBuffer = LhsAttachment.AsCompressedBinary().Decompress();
			CHECK(!LhsBuffer.IsNull());

			zen::SharedBuffer RhsBuffer = RhsAttachment->AsCompressedBinary().Decompress();
			CHECK(!RhsBuffer.IsNull());

			if (!LhsBuffer.GetView().EqualBytes(RhsBuffer.GetView()))
			{
				return false;
			}
		}

		return true;
	};

	SUBCASE("PUT/GET returns correct package")
	{
		std::filesystem::path TestDir = TestEnv.CreateNewTestDir();

		ZenServerInstance Instance1(TestEnv);
		Instance1.SetTestDir(TestDir);
		const uint16_t	  PortNumber = Instance1.SpawnServerAndWaitUntilReady();
		const std::string BaseUri	 = fmt::format("http://localhost:{}/z$", PortNumber);

		const std::string_view Bucket = "mosdef"sv;
		zen::IoHash			   Key;
		zen::CbPackage		   ExpectedPackage = CreateTestPackage(Key);

		// PUT
		{
			zen::IoBuffer Body	 = SerializeToBuffer(ExpectedPackage);
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)},
											cpr::Body{(const char*)Body.Data(), Body.Size()},
											cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 201);
		}

		// GET
		{
			cpr::Response Result =
				cpr::Get(cpr::Url{fmt::format("{}/{}/{}", BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);

			zen::IoBuffer Response(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());

			zen::CbPackage Package;
			const bool	   Ok = Package.TryLoad(Response);
			CHECK(Ok);
			CHECK(IsEqual(Package, ExpectedPackage));
		}
	}

	SUBCASE("PUT propagates upstream")
	{
		// Setup local and remote server
		std::filesystem::path LocalDataDir	= TestEnv.CreateNewTestDir();
		std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();

		ZenServerInstance RemoteInstance(TestEnv);
		RemoteInstance.SetTestDir(RemoteDataDir);
		const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();

		ZenServerInstance LocalInstance(TestEnv);
		LocalInstance.SetTestDir(LocalDataDir);
		LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
								  fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
		const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
		CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());

		const auto LocalBaseUri	 = fmt::format("http://localhost:{}/z$", LocalPortNumber);
		const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);

		const std::string_view Bucket = "mosdef"sv;
		zen::IoHash			   Key;
		zen::CbPackage		   ExpectedPackage = CreateTestPackage(Key);

		// Store the cache record package in the local instance
		{
			zen::IoBuffer Body	 = SerializeToBuffer(ExpectedPackage);
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)},
											cpr::Body{(const char*)Body.Data(), Body.Size()},
											cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});

			CHECK(Result.status_code == 201);
		}

		// The cache record can be retrieved as a package from the local instance
		{
			cpr::Response Result =
				cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);

			zen::IoBuffer  Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
			zen::CbPackage Package;
			const bool	   Ok = Package.TryLoad(Body);
			CHECK(Ok);
			CHECK(IsEqual(Package, ExpectedPackage));
		}

		// The cache record can be retrieved as a package from the remote instance
		{
			cpr::Response Result =
				cpr::Get(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);

			zen::IoBuffer  Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
			zen::CbPackage Package;
			const bool	   Ok = Package.TryLoad(Body);
			CHECK(Ok);
			CHECK(IsEqual(Package, ExpectedPackage));
		}
	}

	SUBCASE("GET finds upstream when missing in local")
	{
		// Setup local and remote server
		std::filesystem::path LocalDataDir	= TestEnv.CreateNewTestDir();
		std::filesystem::path RemoteDataDir = TestEnv.CreateNewTestDir();

		ZenServerInstance RemoteInstance(TestEnv);
		RemoteInstance.SetTestDir(RemoteDataDir);
		const uint16_t RemotePortNumber = RemoteInstance.SpawnServerAndWaitUntilReady();

		ZenServerInstance LocalInstance(TestEnv);
		LocalInstance.SetTestDir(LocalDataDir);
		LocalInstance.SpawnServer(TestEnv.GetNewPortNumber(),
								  fmt::format("--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}", RemotePortNumber));
		const uint16_t LocalPortNumber = LocalInstance.WaitUntilReady();
		CHECK_MESSAGE(LocalPortNumber != 0, LocalInstance.GetLogOutput());

		const auto LocalBaseUri	 = fmt::format("http://localhost:{}/z$", LocalPortNumber);
		const auto RemoteBaseUri = fmt::format("http://localhost:{}/z$", RemotePortNumber);

		const std::string_view Bucket = "mosdef"sv;
		zen::IoHash			   Key;
		zen::CbPackage		   ExpectedPackage = CreateTestPackage(Key);

		// Store the cache record package in upstream cache
		{
			zen::IoBuffer Body	 = SerializeToBuffer(ExpectedPackage);
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", RemoteBaseUri, Bucket, Key)},
											cpr::Body{(const char*)Body.Data(), Body.Size()},
											cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});

			CHECK(Result.status_code == 201);
		}

		// The cache record can be retrieved as a package from the local cache
		{
			cpr::Response Result =
				cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalBaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);

			zen::IoBuffer  Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
			zen::CbPackage Package;
			const bool	   Ok = Package.TryLoad(Body);
			CHECK(Ok);
			CHECK(IsEqual(Package, ExpectedPackage));
		}
	}
}

TEST_CASE("zcache.policy")
{
	using namespace std::literals;
	using namespace utils;

	auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer {
		auto	 Buf  = zen::UniqueBuffer::Alloc(Size);
		uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData());
		for (uint64_t Idx = 0; Idx < Size; Idx++)
		{
			Data[Idx] = Idx % 256;
		}
		OutHash = zen::IoHash::HashBuffer(Data, Size);
		return Buf;
	};

	auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage {
		auto Data			= zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9}));
		auto CompressedData = zen::CompressedBuffer::Compress(Data);
		OutAttachmentKey	= CompressedData.DecodeRawHash();

		zen::CbWriter Writer;
		Writer.BeginObject("obj"sv);
		Writer.AddBinaryAttachment("data", OutAttachmentKey);
		Writer.EndObject();
		CbObject CacheRecord = Writer.Save().AsObject();

		OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView());

		zen::CbPackage Package;
		Package.SetObject(CacheRecord);
		Package.AddAttachment(zen::CbAttachment(CompressedData, OutAttachmentKey));

		return Package;
	};

	auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
		zen::BinaryWriter MemStream;
		Package.Save(MemStream);

		return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
	};

	SUBCASE("query - 'local' does not query upstream (binary)")
	{
		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamInst(TestEnv);
		UpstreamCfg.Spawn(UpstreamInst);
		const uint16_t UpstreamPort = UpstreamCfg.Port;

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
		ZenServerInstance LocalInst(TestEnv);
		LocalCfg.Spawn(LocalInst);

		const std::string_view Bucket = "legacy"sv;

		zen::IoHash Key;
		auto		BinaryValue = GenerateData(1024, Key);

		// Store binary cache value upstream
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
											cpr::Header{{"Content-Type", "application/octet-stream"}});
			CHECK(Result.status_code == 201);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/octet-stream"}});
			CHECK(Result.status_code == 404);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/octet-stream"}});
			CHECK(Result.status_code == 200);
		}
	}

	SUBCASE("store - 'local' does not store upstream (binary)")
	{
		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamInst(TestEnv);
		UpstreamCfg.Spawn(UpstreamInst);
		const uint16_t UpstreamPort = UpstreamCfg.Port;

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamPort);
		ZenServerInstance LocalInst(TestEnv);
		LocalCfg.Spawn(LocalInst);

		const auto Bucket = "legacy"sv;

		zen::IoHash Key;
		auto		BinaryValue = GenerateData(1024, Key);

		// Store binary cache value locally
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
											cpr::Header{{"Content-Type", "application/octet-stream"}});
			CHECK(Result.status_code == 201);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/octet-stream"}});
			CHECK(Result.status_code == 404);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/octet-stream"}});
			CHECK(Result.status_code == 200);
		}
	}

	SUBCASE("store - 'local/remote' stores local and upstream (binary)")
	{
		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamInst(TestEnv);
		UpstreamCfg.Spawn(UpstreamInst);

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
		ZenServerInstance LocalInst(TestEnv);
		LocalCfg.Spawn(LocalInst);

		const auto Bucket = "legacy"sv;

		zen::IoHash Key;
		auto		BinaryValue = GenerateData(1024, Key);

		// Store binary cache value locally and upstream
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
											cpr::Header{{"Content-Type", "application/octet-stream"}});
			CHECK(Result.status_code == 201);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/octet-stream"}});
			CHECK(Result.status_code == 200);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/octet-stream"}});
			CHECK(Result.status_code == 200);
		}
	}

	SUBCASE("query - 'local' does not query upstream (cppackage)")
	{
		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamInst(TestEnv);
		UpstreamCfg.Spawn(UpstreamInst);

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
		ZenServerInstance LocalInst(TestEnv);
		LocalCfg.Spawn(LocalInst);

		const auto Bucket = "legacy"sv;

		zen::IoHash	   Key;
		zen::IoHash	   PayloadId;
		zen::CbPackage Package = GeneratePackage(Key, PayloadId);
		auto		   Buf	   = ToBuffer(Package);

		// Store package upstream
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
											cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 201);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=QueryLocal,Store", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 404);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);
		}
	}

	SUBCASE("store - 'local' does not store upstream (cbpackge)")
	{
		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamInst(TestEnv);
		UpstreamCfg.Spawn(UpstreamInst);

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
		ZenServerInstance LocalInst(TestEnv);
		LocalCfg.Spawn(LocalInst);

		const auto Bucket = "legacy"sv;

		zen::IoHash	   Key;
		zen::IoHash	   PayloadId;
		zen::CbPackage Package = GeneratePackage(Key, PayloadId);
		auto		   Buf	   = ToBuffer(Package);

		// Store packge locally
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,StoreLocal", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
											cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 201);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 404);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);
		}
	}

	SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)")
	{
		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamInst(TestEnv);
		UpstreamCfg.Spawn(UpstreamInst);

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
		ZenServerInstance LocalInst(TestEnv);
		LocalCfg.Spawn(LocalInst);

		const auto Bucket = "legacy"sv;

		zen::IoHash	   Key;
		zen::IoHash	   PayloadId;
		zen::CbPackage Package = GeneratePackage(Key, PayloadId);
		auto		   Buf	   = ToBuffer(Package);

		// Store package locally and upstream
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}?Policy=Query,Store", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
											cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 201);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", UpstreamCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);
		}

		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}", LocalCfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 200);
		}
	}

	SUBCASE("skip - 'data' returns cache record without attachments/empty payload")
	{
		ZenConfig		  Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance Instance(TestEnv);
		Cfg.Spawn(Instance);

		const auto Bucket = "test"sv;

		zen::IoHash	   Key;
		zen::IoHash	   PayloadId;
		zen::CbPackage Package = GeneratePackage(Key, PayloadId);
		auto		   Buf	   = ToBuffer(Package);

		// Store package
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
											cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
			CHECK(Result.status_code == 201);
		}

		// Get package
		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cbpkg"}});
			CHECK(IsHttpSuccessCode(Result.status_code));
			IoBuffer  Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
			CbPackage ResponsePackage;
			CHECK(ResponsePackage.TryLoad(Buffer));
			CHECK(ResponsePackage.GetAttachments().size() == 0);
		}

		// Get record
		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/x-ue-cb"}});
			CHECK(IsHttpSuccessCode(Result.status_code));
			IoBuffer Buffer(IoBuffer::Wrap, Result.text.c_str(), Result.text.size());
			CbObject ResponseObject = zen::LoadCompactBinaryObject(Buffer);
			CHECK((bool)ResponseObject);
		}

		// Get payload
		{
			cpr::Response Result =
				cpr::Get(cpr::Url{fmt::format("{}/{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key, PayloadId)},
						 cpr::Header{{"Accept", "application/x-ue-comp"}});
			CHECK(IsHttpSuccessCode(Result.status_code));
			CHECK(Result.text.size() == 0);
		}
	}

	SUBCASE("skip - 'data' returns empty binary value")
	{
		ZenConfig		  Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance Instance(TestEnv);
		Cfg.Spawn(Instance);

		const auto Bucket = "test"sv;

		zen::IoHash Key;
		auto		BinaryValue = GenerateData(1024, Key);

		// Store binary cache value
		{
			cpr::Response Result = cpr::Put(cpr::Url{fmt::format("{}/{}/{}", Cfg.BaseUri, Bucket, Key)},
											cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()},
											cpr::Header{{"Content-Type", "application/octet-stream"}});
			CHECK(Result.status_code == 201);
		}

		// Get package
		{
			cpr::Response Result = cpr::Get(cpr::Url{fmt::format("{}/{}/{}?Policy=Default,SkipData", Cfg.BaseUri, Bucket, Key)},
											cpr::Header{{"Accept", "application/octet-stream"}});
			CHECK(IsHttpSuccessCode(Result.status_code));
			CHECK(Result.text.size() == 0);
		}
	}
}

TEST_CASE("zcache.rpc")
{
	using namespace std::literals;

	auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
								const zen::CacheKey&				   CacheKey,
								size_t								   PayloadSize,
								CachePolicy							   RecordPolicy) {
		std::vector<uint8_t> Data;
		Data.resize(PayloadSize);
		uint32_t  DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]);
		uint16_t* DataPtr  = reinterpret_cast<uint16_t*>(Data.data());
		for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx)
		{
			DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu);
		}
		if (PayloadSize & 1)
		{
			Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff);
		}
		CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
		Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
	};

	auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
												std::string_view Namespace,
												std::string_view Bucket,
												size_t			 Num,
												size_t			 PayloadSize = 1024,
												size_t			 KeyOffset	 = 1) -> std::vector<CacheKey> {
		std::vector<zen::CacheKey> OutKeys;

		for (uint32_t Key = 1; Key <= Num; ++Key)
		{
			zen::IoHash KeyHash;
			((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key);
			const zen::CacheKey CacheKey   = zen::CacheKey::Create(Bucket, KeyHash);

			cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
			AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
			OutKeys.push_back(CacheKey);

			CbPackage Package;
			CHECK(Request.Format(Package));

			IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
			cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
											 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
											 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});

			CHECK(Result.status_code == 200);
		}

		return OutKeys;
	};

	struct GetCacheRecordResult
	{
		zen::CbPackage						 Response;
		cacherequests::GetCacheRecordsResult Result;
		bool								 Success;
	};

	auto GetCacheRecords = [](std::string_view		   BaseUri,
							  std::string_view		   Namespace,
							  std::span<zen::CacheKey> Keys,
							  zen::CachePolicy		   Policy,
							  zen::RpcAcceptOptions	   AcceptOptions = zen::RpcAcceptOptions::kNone,
							  int					   Pid			 = 0) -> GetCacheRecordResult {
		cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic	= kCbPkgMagic,
														 .AcceptOptions = static_cast<uint16_t>(AcceptOptions),
														 .ProcessPid	= Pid,
														 .DefaultPolicy = Policy,
														 .Namespace		= std::string(Namespace)};
		for (const CacheKey& Key : Keys)
		{
			Request.Requests.push_back({.Key = Key});
		}

		CbObjectWriter RequestWriter;
		CHECK(Request.Format(RequestWriter));

		BinaryWriter Body;
		RequestWriter.Save(Body);

		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});

		GetCacheRecordResult OutResult;

		if (Result.status_code == 200)
		{
			CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
			CHECK(!Response.IsNull());
			OutResult.Response = std::move(Response);
			CHECK(OutResult.Result.Parse(OutResult.Response));
			OutResult.Success = true;
		}

		return OutResult;
	};

	SUBCASE("get cache records")
	{
		std::filesystem::path TestDir = TestEnv.CreateNewTestDir();

		ZenServerInstance Inst(TestEnv);
		Inst.SetTestDir(TestDir);

		const uint16_t	  BasePort = Inst.SpawnServerAndWaitUntilReady();
		const std::string BaseUri  = fmt::format("http://localhost:{}/z$", BasePort);

		CachePolicy				   Policy = CachePolicy::Default;
		std::vector<zen::CacheKey> Keys	  = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
		GetCacheRecordResult	   Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);

		CHECK(Result.Result.Results.size() == Keys.size());

		for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
		{
			const CacheKey& ExpectedKey = Keys[Index++];
			CHECK(Record);
			CHECK(Record->Key == ExpectedKey);
			CHECK(Record->Values.size() == 1);

			for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
			{
				CHECK(Value.Body);
			}
		}
	}

	SUBCASE("get missing cache records")
	{
		std::filesystem::path TestDir = TestEnv.CreateNewTestDir();

		ZenServerInstance Inst(TestEnv);
		Inst.SetTestDir(TestDir);
		const uint16_t	  BasePort = Inst.SpawnServerAndWaitUntilReady();
		const std::string BaseUri  = fmt::format("http://localhost:{}/z$", BasePort);

		CachePolicy				   Policy		= CachePolicy::Default;
		std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 128);
		std::vector<zen::CacheKey> Keys;

		for (const zen::CacheKey& Key : ExistingKeys)
		{
			Keys.push_back(Key);
			Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero));
		}

		GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, Policy);

		CHECK(Result.Result.Results.size() == Keys.size());

		size_t KeyIndex = 0;
		for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
		{
			const bool Missing = Index++ % 2 != 0;

			if (Missing)
			{
				CHECK(!Record);
			}
			else
			{
				const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++];
				CHECK(Record->Key == ExpectedKey);
				for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
				{
					CHECK(Value.Body);
				}
			}
		}
	}

	SUBCASE("policy - 'QueryLocal' does not query upstream")
	{
		using namespace utils;

		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamServer(TestEnv);
		SpawnServer(UpstreamServer, UpstreamCfg);

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
		ZenServerInstance LocalServer(TestEnv);
		SpawnServer(LocalServer, LocalCfg);

		std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);

		CachePolicy			 Policy = CachePolicy::QueryLocal;
		GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);

		CHECK(Result.Result.Results.size() == Keys.size());

		for (const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
		{
			CHECK(!Record);
		}
	}

	SUBCASE("policy - 'QueryRemote' does query upstream")
	{
		using namespace utils;

		ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
		ZenServerInstance UpstreamServer(TestEnv);
		SpawnServer(UpstreamServer, UpstreamCfg);

		ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
		ZenServerInstance LocalServer(TestEnv);
		SpawnServer(LocalServer, LocalCfg);

		std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, "ue4.ddc"sv, "mastodon"sv, 4);

		CachePolicy			 Policy = (CachePolicy::QueryLocal | CachePolicy::QueryRemote);
		GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, "ue4.ddc"sv, Keys, Policy);

		CHECK(Result.Result.Results.size() == Keys.size());

		for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
		{
			CHECK(Record);
			const CacheKey& ExpectedKey = Keys[Index++];
			CHECK(Record->Key == ExpectedKey);
		}
	}

	SUBCASE("RpcAcceptOptions")
	{
		using namespace utils;

		std::filesystem::path TestDir = TestEnv.CreateNewTestDir();

		ZenServerInstance Inst(TestEnv);
		Inst.SetTestDir(TestDir);

		const uint16_t	  BasePort = Inst.SpawnServerAndWaitUntilReady();
		const std::string BaseUri  = fmt::format("http://localhost:{}/z$", BasePort);

		std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024);
		std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size());

		std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end());
		Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end());

		{
			GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default);

			CHECK(Result.Result.Results.size() == Keys.size());

			for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
			{
				CHECK(Record);
				const CacheKey& ExpectedKey = Keys[Index++];
				CHECK(Record->Key == ExpectedKey);
				for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
				{
					const IoBuffer&		  Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
					IoBufferFileReference Ref;
					bool				  IsFileRef = Body.GetFileReference(Ref);
					CHECK(!IsFileRef);
				}
			}
		}

		// File path, but only for large files
		{
			GetCacheRecordResult Result =
				GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences);

			CHECK(Result.Result.Results.size() == Keys.size());

			for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
			{
				CHECK(Record);
				const CacheKey& ExpectedKey = Keys[Index++];
				CHECK(Record->Key == ExpectedKey);
				for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
				{
					const IoBuffer&		  Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
					IoBufferFileReference Ref;
					bool				  IsFileRef = Body.GetFileReference(Ref);
					CHECK(IsFileRef == (Body.Size() > 1024));
				}
			}
		}

		// File path, for all files
		{
			GetCacheRecordResult Result =
				GetCacheRecords(BaseUri,
								"ue4.ddc"sv,
								Keys,
								CachePolicy::Default,
								RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences);

			CHECK(Result.Result.Results.size() == Keys.size());

			for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
			{
				CHECK(Record);
				const CacheKey& ExpectedKey = Keys[Index++];
				CHECK(Record->Key == ExpectedKey);
				for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
				{
					const IoBuffer&		  Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
					IoBufferFileReference Ref;
					bool				  IsFileRef = Body.GetFileReference(Ref);
					CHECK(IsFileRef);
				}
			}
		}

		// File handle, but only for large files
		{
			GetCacheRecordResult Result = GetCacheRecords(BaseUri,
														  "ue4.ddc"sv,
														  Keys,
														  CachePolicy::Default,
														  RpcAcceptOptions::kAllowLocalReferences,
														  GetCurrentProcessId());

			CHECK(Result.Result.Results.size() == Keys.size());

			for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
			{
				CHECK(Record);
				const CacheKey& ExpectedKey = Keys[Index++];
				CHECK(Record->Key == ExpectedKey);
				for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
				{
					const IoBuffer&		  Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
					IoBufferFileReference Ref;
					bool				  IsFileRef = Body.GetFileReference(Ref);
					CHECK(IsFileRef == (Body.Size() > 1024));
				}
			}
		}

		// File handle, for all files
		{
			GetCacheRecordResult Result =
				GetCacheRecords(BaseUri,
								"ue4.ddc"sv,
								Keys,
								CachePolicy::Default,
								RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences,
								GetCurrentProcessId());

			CHECK(Result.Result.Results.size() == Keys.size());

			for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
			{
				CHECK(Record);
				const CacheKey& ExpectedKey = Keys[Index++];
				CHECK(Record->Key == ExpectedKey);
				for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
				{
					const IoBuffer&		  Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
					IoBufferFileReference Ref;
					bool				  IsFileRef = Body.GetFileReference(Ref);
					CHECK(IsFileRef);
				}
			}
		}
	}
}

TEST_CASE("zcache.failing.upstream")
{
	// This is an exploratory test that takes a long time to run, so lets skip it by default
	if (true)
	{
		return;
	}

	using namespace std::literals;
	using namespace utils;

	ZenConfig		  Upstream1Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
	ZenServerInstance Upstream1Server(TestEnv);
	SpawnServer(Upstream1Server, Upstream1Cfg);

	ZenConfig		  Upstream2Cfg = ZenConfig::New(TestEnv.GetNewPortNumber());
	ZenServerInstance Upstream2Server(TestEnv);
	SpawnServer(Upstream2Server, Upstream2Cfg);

	std::vector<std::uint16_t> UpstreamPorts = {Upstream1Cfg.Port, Upstream2Cfg.Port};
	ZenConfig				   LocalCfg		 = ZenConfig::NewWithThreadedUpstreams(TestEnv.GetNewPortNumber(), UpstreamPorts, false);
	LocalCfg.Args += (" --upstream-thread-count 2");
	ZenServerInstance LocalServer(TestEnv);
	SpawnServer(LocalServer, LocalCfg);

	const uint16_t LocalPortNumber = LocalCfg.Port;
	const auto	   LocalUri		   = fmt::format("http://localhost:{}/z$", LocalPortNumber);
	const auto	   Upstream1Uri	   = fmt::format("http://localhost:{}/z$", Upstream1Cfg.Port);
	const auto	   Upstream2Uri	   = fmt::format("http://localhost:{}/z$", Upstream2Cfg.Port);

	bool Upstream1Running = true;
	bool Upstream2Running = true;

	using namespace std::literals;

	auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
								const zen::CacheKey&				   CacheKey,
								size_t								   PayloadSize,
								CachePolicy							   RecordPolicy) {
		std::vector<uint32_t> Data;
		Data.resize(PayloadSize / 4);
		for (uint32_t Idx = 0; Idx < PayloadSize / 4; ++Idx)
		{
			Data[Idx] = (*reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0])) + Idx;
		}

		CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size() * 4));
		Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
	};

	auto PutCacheRecords = [&AppendCacheRecord](std::string_view BaseUri,
												std::string_view Namespace,
												std::string_view Bucket,
												size_t			 Num,
												size_t			 KeyOffset,
												size_t			 PayloadSize = 8192) -> std::vector<CacheKey> {
		std::vector<zen::CacheKey> OutKeys;

		cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
		for (size_t Key = 1; Key <= Num; ++Key)
		{
			zen::IoHash KeyHash;
			((size_t*)(KeyHash.Hash))[0] = KeyOffset + Key;
			const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);

			AppendCacheRecord(Request, CacheKey, PayloadSize, CachePolicy::Default);
			OutKeys.push_back(CacheKey);
		}

		CbPackage Package;
		CHECK(Request.Format(Package));

		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});

		if (Result.status_code != 200)
		{
			ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
			OutKeys.clear();
		}

		return OutKeys;
	};

	struct GetCacheRecordResult
	{
		zen::CbPackage						 Response;
		cacherequests::GetCacheRecordsResult Result;
		bool								 Success = false;
	};

	auto GetCacheRecords = [](std::string_view		   BaseUri,
							  std::string_view		   Namespace,
							  std::span<zen::CacheKey> Keys,
							  zen::CachePolicy		   Policy) -> GetCacheRecordResult {
		cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic	= kCbPkgMagic,
														 .DefaultPolicy = Policy,
														 .Namespace		= std::string(Namespace)};
		for (const CacheKey& Key : Keys)
		{
			Request.Requests.push_back({.Key = Key});
		}

		CbObjectWriter RequestWriter;
		CHECK(Request.Format(RequestWriter));

		BinaryWriter Body;
		RequestWriter.Save(Body);

		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});

		GetCacheRecordResult OutResult;

		if (Result.status_code == 200)
		{
			CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
			if (!Response.IsNull())
			{
				OutResult.Response = std::move(Response);
				CHECK(OutResult.Result.Parse(OutResult.Response));
				OutResult.Success = true;
			}
		}
		else
		{
			ZEN_DEBUG("GetCacheRecords with {}, reason '{}'", Result.reason, Result.status_code);
		}

		return OutResult;
	};

	// Populate with some simple data

	CachePolicy Policy = CachePolicy::Default;

	const size_t	 ThreadCount	   = 128;
	const size_t	 KeyMultiplier	   = 16384;
	const size_t	 RecordsPerRequest = 64;
	WorkerThreadPool Pool(ThreadCount);

	std::atomic_size_t Completed = 0;

	auto   Keys = new std::vector<CacheKey>[ThreadCount * KeyMultiplier];
	RwLock KeysLock;

	for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
	{
		size_t Iteration = I;
		Pool.ScheduleWork([&] {
			std::vector<CacheKey> NewKeys = PutCacheRecords(LocalUri, "ue4.ddc"sv, "mastodon"sv, RecordsPerRequest, I * RecordsPerRequest);
			if (NewKeys.size() != RecordsPerRequest)
			{
				ZEN_DEBUG("PutCacheRecords iteration {} failed", Iteration);
				Completed.fetch_add(1);
				return;
			}
			{
				RwLock::ExclusiveLockScope _(KeysLock);
				Keys[Iteration].swap(NewKeys);
			}
			Completed.fetch_add(1);
		});
	}
	bool UseUpstream1 = false;
	while (Completed < ThreadCount * KeyMultiplier)
	{
		Sleep(8000);

		if (UseUpstream1)
		{
			if (Upstream2Running)
			{
				Upstream2Server.EnableTermination();
				Upstream2Server.Shutdown();
				Sleep(100);
				Upstream2Running = false;
			}
			if (!Upstream1Running)
			{
				SpawnServer(Upstream1Server, Upstream1Cfg);
				Upstream1Running = true;
			}
			UseUpstream1 = !UseUpstream1;
		}
		else
		{
			if (Upstream1Running)
			{
				Upstream1Server.EnableTermination();
				Upstream1Server.Shutdown();
				Sleep(100);
				Upstream1Running = false;
			}
			if (!Upstream2Running)
			{
				SpawnServer(Upstream2Server, Upstream2Cfg);
				Upstream2Running = true;
			}
			UseUpstream1 = !UseUpstream1;
		}
	}

	Completed = 0;
	for (size_t I = 0; I < ThreadCount * KeyMultiplier; I++)
	{
		size_t				   Iteration = I;
		std::vector<CacheKey>& LocalKeys = Keys[Iteration];
		if (LocalKeys.empty())
		{
			Completed.fetch_add(1);
			continue;
		}
		Pool.ScheduleWork([&] {
			GetCacheRecordResult Result = GetCacheRecords(LocalUri, "ue4.ddc"sv, LocalKeys, Policy);

			if (!Result.Success)
			{
				ZEN_DEBUG("GetCacheRecords iteration {} failed", Iteration);
				Completed.fetch_add(1);
				return;
			}

			if (Result.Result.Results.size() != LocalKeys.size())
			{
				ZEN_DEBUG("GetCacheRecords iteration {} empty records", Iteration);
				Completed.fetch_add(1);
				return;
			}
			for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
			{
				const CacheKey& ExpectedKey = LocalKeys[Index++];
				if (!Record)
				{
					continue;
				}
				if (Record->Key != ExpectedKey)
				{
					continue;
				}
				if (Record->Values.size() != 1)
				{
					continue;
				}

				for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
				{
					if (!Value.Body)
					{
						continue;
					}
				}
			}
			Completed.fetch_add(1);
		});
	}
	while (Completed < ThreadCount * KeyMultiplier)
	{
		Sleep(10);
	}
}

TEST_CASE("zcache.rpc.partialchunks")
{
	using namespace std::literals;
	using namespace utils;

	ZenConfig		  LocalCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
	ZenServerInstance Server(TestEnv);
	SpawnServer(Server, LocalCfg);

	std::vector<CompressedBuffer> Attachments;

	const auto BaseUri = fmt::format("http://localhost:{}/z$", Server.GetBasePort());

	auto GenerateKey = [](std::string_view Bucket, size_t KeyIndex) -> CacheKey {
		IoHash KeyHash;
		((size_t*)(KeyHash.Hash))[0] = KeyIndex;
		return CacheKey::Create(Bucket, KeyHash);
	};

	auto AppendCacheRecord = [](cacherequests::PutCacheRecordsRequest& Request,
								const CacheKey&						   CacheKey,
								size_t								   AttachmentCount,
								size_t								   AttachmentsSize,
								CachePolicy							   RecordPolicy) -> std::vector<std::pair<Oid, CompressedBuffer>> {
		std::vector<std::pair<Oid, CompressedBuffer>>		   AttachmentBuffers;
		std::vector<cacherequests::PutCacheRecordRequestValue> Attachments;
		for (size_t AttachmentIndex = 0; AttachmentIndex < AttachmentCount; AttachmentIndex++)
		{
			CompressedBuffer Value = CreateSemiRandomBlob(AttachmentsSize);
			AttachmentBuffers.push_back(std::make_pair(Oid::NewOid(), Value));
			Attachments.push_back({.Id = AttachmentBuffers.back().first, .Body = std::move(Value)});
		}
		Request.Requests.push_back({.Key = CacheKey, .Values = Attachments, .Policy = RecordPolicy});
		return AttachmentBuffers;
	};

	auto PutCacheRecords = [&AppendCacheRecord, &GenerateKey](
							   std::string_view BaseUri,
							   std::string_view Namespace,
							   std::string_view Bucket,
							   size_t			KeyOffset,
							   size_t			Num,
							   size_t			AttachmentCount,
							   size_t			AttachmentsSize =
								   8192) -> std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> {
		std::vector<std::pair<CacheKey, std::vector<std::pair<Oid, CompressedBuffer>>>> Keys;

		cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
		for (size_t Key = 1; Key <= Num; ++Key)
		{
			const CacheKey								  NewCacheKey = GenerateKey(Bucket, KeyOffset + Key);
			std::vector<std::pair<Oid, CompressedBuffer>> Attachments =
				AppendCacheRecord(Request, NewCacheKey, AttachmentCount, AttachmentsSize, CachePolicy::Default);
			Keys.push_back(std::make_pair(NewCacheKey, std::move(Attachments)));
		}

		CbPackage Package;
		CHECK(Request.Format(Package));

		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});

		if (Result.status_code != 200)
		{
			ZEN_DEBUG("PutCacheRecords failed with {}, reason '{}'", Result.status_code, Result.reason);
			Keys.clear();
		}

		return Keys;
	};

	std::string_view TestBucket					 = "partialcachevaluetests"sv;
	std::string_view TestNamespace				 = "ue4.ddc"sv;
	auto			 RecordsWithSmallAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 0, 3, 2, 4096u);
	CHECK(RecordsWithSmallAttachments.size() == 3);
	auto RecordsWithLargeAttachments = PutCacheRecords(BaseUri, TestNamespace, TestBucket, 10, 1, 2, 8u * 1024u * 1024u);
	CHECK(RecordsWithLargeAttachments.size() == 1);

	struct PartialOptions
	{
		uint64_t		 Offset		   = 0ull;
		uint64_t		 Size		   = ~0ull;
		RpcAcceptOptions AcceptOptions = RpcAcceptOptions::kNone;
	};

	auto GetCacheChunk = [](std::string_view	  BaseUri,
							std::string_view	  Namespace,
							const CacheKey&		  Key,
							const Oid&			  ValueId,
							const PartialOptions& Options = {}) -> cacherequests::GetCacheChunksResult {
		cacherequests::GetCacheChunksRequest Request = {
			.AcceptMagic   = kCbPkgMagic,
			.AcceptOptions = (uint16_t)Options.AcceptOptions,
			.Namespace	   = std::string(Namespace),
			.Requests	   = {{.Key = Key, .ValueId = ValueId, .RawOffset = Options.Offset, .RawSize = Options.Size}}};
		CbPackage Package;
		CHECK(Request.Format(Package));
		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});

		CHECK(Result.status_code == 200);

		CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
		bool	  Loaded   = !Response.IsNull();
		CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
		cacherequests::GetCacheChunksResult GetCacheChunksResult;
		CHECK(GetCacheChunksResult.Parse(Response));
		return GetCacheChunksResult;
	};

	auto GetAndVerifyChunk = [&GetCacheChunk](std::string_view		  BaseUri,
											  std::string_view		  Namespace,
											  const CacheKey&		  Key,
											  const Oid&			  ChunkId,
											  const CompressedBuffer& VerifyData,
											  const PartialOptions&	  Options = {}) {
		cacherequests::GetCacheChunksResult Result = GetCacheChunk(BaseUri, Namespace, Key, ChunkId, Options);
		CHECK(Result.Results.size() == 1);
		bool CanGetPartial = ((uint16_t)Options.AcceptOptions & (uint16_t)RpcAcceptOptions::kAllowPartialCacheChunks);
		if (!CanGetPartial)
		{
			CHECK(Result.Results[0].FragmentOffset == 0);
			CHECK(Result.Results[0].Body.GetCompressedSize() == VerifyData.GetCompressedSize());
		}
		IoBuffer SourceDecompressed = VerifyData.Decompress(Options.Offset, Options.Size).AsIoBuffer();
		IoBuffer ReceivedDecompressed =
			Result.Results[0].Body.Decompress(Options.Offset - Result.Results[0].FragmentOffset, Options.Size).AsIoBuffer();
		CHECK(SourceDecompressed.GetView().EqualBytes(ReceivedDecompressed.GetView()));
	};

	GetAndVerifyChunk(BaseUri,
					  TestNamespace,
					  RecordsWithSmallAttachments[0].first,
					  RecordsWithSmallAttachments[0].second[0].first,
					  RecordsWithSmallAttachments[0].second[0].second);
	GetAndVerifyChunk(BaseUri,
					  TestNamespace,
					  RecordsWithSmallAttachments[0].first,
					  RecordsWithSmallAttachments[0].second[0].first,
					  RecordsWithSmallAttachments[0].second[0].second,
					  PartialOptions{.Offset = 378, .Size = 519, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
	GetAndVerifyChunk(
		BaseUri,
		TestNamespace,
		RecordsWithSmallAttachments[0].first,
		RecordsWithSmallAttachments[0].second[0].first,
		RecordsWithSmallAttachments[0].second[0].second,
		PartialOptions{.Offset		  = 378,
					   .Size		  = 519,
					   .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
	GetAndVerifyChunk(BaseUri,
					  TestNamespace,
					  RecordsWithLargeAttachments[0].first,
					  RecordsWithLargeAttachments[0].second[0].first,
					  RecordsWithLargeAttachments[0].second[0].second,
					  PartialOptions{.AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
	GetAndVerifyChunk(BaseUri,
					  TestNamespace,
					  RecordsWithLargeAttachments[0].first,
					  RecordsWithLargeAttachments[0].second[0].first,
					  RecordsWithLargeAttachments[0].second[0].second,
					  PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u});
	GetAndVerifyChunk(
		BaseUri,
		TestNamespace,
		RecordsWithLargeAttachments[0].first,
		RecordsWithLargeAttachments[0].second[0].first,
		RecordsWithLargeAttachments[0].second[0].second,
		PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences});
	GetAndVerifyChunk(
		BaseUri,
		TestNamespace,
		RecordsWithLargeAttachments[0].first,
		RecordsWithLargeAttachments[0].second[0].first,
		RecordsWithLargeAttachments[0].second[0].second,
		PartialOptions{.Offset = 1024u * 1024u, .Size = 512u * 1024u, .AcceptOptions = RpcAcceptOptions::kAllowPartialCacheChunks});
	GetAndVerifyChunk(
		BaseUri,
		TestNamespace,
		RecordsWithLargeAttachments[0].first,
		RecordsWithLargeAttachments[0].second[0].first,
		RecordsWithLargeAttachments[0].second[0].second,
		PartialOptions{.Offset		  = 1024u * 1024u,
					   .Size		  = 512u * 1024u,
					   .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialCacheChunks});
	GetAndVerifyChunk(
		BaseUri,
		TestNamespace,
		RecordsWithLargeAttachments[0].first,
		RecordsWithLargeAttachments[0].second[0].first,
		RecordsWithLargeAttachments[0].second[0].second,
		PartialOptions{.Offset		  = 1024u * 1024u,
					   .Size		  = 512u * 1024u,
					   .AcceptOptions = RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences |
										RpcAcceptOptions::kAllowPartialCacheChunks});
}

TEST_CASE("zcache.rpc.allpolicies")
{
	using namespace std::literals;
	using namespace utils;

	ZenConfig		  UpstreamCfg = ZenConfig::New(TestEnv.GetNewPortNumber());
	ZenServerInstance UpstreamServer(TestEnv);
	SpawnServer(UpstreamServer, UpstreamCfg);

	ZenConfig		  LocalCfg = ZenConfig::NewWithUpstream(TestEnv.GetNewPortNumber(), UpstreamCfg.Port);
	ZenServerInstance LocalServer(TestEnv);
	SpawnServer(LocalServer, LocalCfg);

	const auto BaseUri = fmt::format("http://localhost:{}/z$", LocalServer.GetBasePort());

	std::string_view TestVersion   = "F72150A02AE34B57A9EC91D36BA1CE08"sv;
	std::string_view TestBucket	   = "allpoliciestest"sv;
	std::string_view TestNamespace = "ue4.ddc"sv;

	// NumKeys = (2 Value vs Record)*(2 SkipData vs Default)*(2 ForceMiss vs Not)*(2 use local)
	// *(2 use remote)*(2 UseValue Policy vs not)*(4 cases per type)
	constexpr int NumKeys	= 256;
	constexpr int NumValues = 4;
	Oid			  ValueIds[NumValues];
	IoHash		  Hash;
	for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
	{
		ExtendableStringBuilder<16> ValueName;
		ValueName << "ValueId_"sv << ValueIndex;
		static_assert(sizeof(IoHash) >= sizeof(Oid));
		ValueIds[ValueIndex] = Oid::FromMemory(IoHash::HashBuffer(ValueName.Data(), ValueName.Size() * sizeof(ValueName.Data()[0])).Hash);
	}

	struct KeyData;
	struct UserData
	{
		UserData& Set(KeyData* InKeyData, int InValueIndex)
		{
			Data	   = InKeyData;
			ValueIndex = InValueIndex;
			return *this;
		}
		KeyData* Data		= nullptr;
		int		 ValueIndex = 0;
	};
	struct KeyData
	{
		CompressedBuffer BufferValues[NumValues];
		uint64_t		 IntValues[NumValues];
		UserData		 ValueUserData[NumValues];
		bool			 ReceivedChunk[NumValues];
		CacheKey		 Key;
		UserData		 KeyUserData;
		uint32_t		 KeyIndex		  = 0;
		bool			 GetRequestsData  = true;
		bool			 UseValueAPI	  = false;
		bool			 UseValuePolicy	  = false;
		bool			 ForceMiss		  = false;
		bool			 UseLocal		  = true;
		bool			 UseRemote		  = true;
		bool			 ShouldBeHit	  = true;
		bool			 ReceivedPut	  = false;
		bool			 ReceivedGet	  = false;
		bool			 ReceivedPutValue = false;
		bool			 ReceivedGetValue = false;
	};
	struct CachePutRequest
	{
		CacheKey		  Key;
		CbObject		  Record;
		CacheRecordPolicy Policy;
		KeyData*		  Values;
		UserData*		  Data;
	};
	struct CachePutValueRequest
	{
		CacheKey		 Key;
		CompressedBuffer Value;
		CachePolicy		 Policy;
		UserData*		 Data;
	};
	struct CacheGetRequest
	{
		CacheKey		  Key;
		CacheRecordPolicy Policy;
		UserData*		  Data;
	};
	struct CacheGetValueRequest
	{
		CacheKey	Key;
		CachePolicy Policy;
		UserData*	Data;
	};
	struct CacheGetChunkRequest
	{
		CacheKey	Key;
		Oid			ValueId;
		uint64_t	RawOffset;
		uint64_t	RawSize;
		IoHash		RawHash;
		CachePolicy Policy;
		UserData*	Data;
	};

	KeyData							  KeyDatas[NumKeys];
	std::vector<CachePutRequest>	  PutRequests;
	std::vector<CachePutValueRequest> PutValueRequests;
	std::vector<CacheGetRequest>	  GetRequests;
	std::vector<CacheGetValueRequest> GetValueRequests;
	std::vector<CacheGetChunkRequest> ChunkRequests;

	for (uint32_t KeyIndex = 0; KeyIndex < NumKeys; ++KeyIndex)
	{
		IoHashStream KeyWriter;
		KeyWriter.Append(TestVersion.data(), TestVersion.length() * sizeof(TestVersion.data()[0]));
		KeyWriter.Append(&KeyIndex, sizeof(KeyIndex));
		IoHash	 KeyHash = KeyWriter.GetHash();
		KeyData& KeyData = KeyDatas[KeyIndex];

		KeyData.Key				 = CacheKey::Create(TestBucket, KeyHash);
		KeyData.KeyIndex		 = KeyIndex;
		KeyData.GetRequestsData	 = (KeyIndex & (1 << 1)) == 0;
		KeyData.UseValueAPI		 = (KeyIndex & (1 << 2)) != 0;
		KeyData.UseValuePolicy	 = (KeyIndex & (1 << 3)) != 0;
		KeyData.ForceMiss		 = (KeyIndex & (1 << 4)) == 0;
		KeyData.UseLocal		 = (KeyIndex & (1 << 5)) == 0;
		KeyData.UseRemote		 = (KeyIndex & (1 << 6)) == 0;
		KeyData.ShouldBeHit		 = !KeyData.ForceMiss && (KeyData.UseLocal || KeyData.UseRemote);
		CachePolicy SharedPolicy = KeyData.UseLocal ? CachePolicy::Local : CachePolicy::None;
		SharedPolicy |= KeyData.UseRemote ? CachePolicy::Remote : CachePolicy::None;
		CachePolicy PutPolicy = SharedPolicy;
		CachePolicy GetPolicy = SharedPolicy;
		GetPolicy |= !KeyData.GetRequestsData ? CachePolicy::SkipData : CachePolicy::None;
		CacheKey& Key = KeyData.Key;

		for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
		{
			KeyData.IntValues[ValueIndex] = static_cast<uint64_t>(KeyIndex) | (static_cast<uint64_t>(ValueIndex) << 32);
			KeyData.BufferValues[ValueIndex] =
				CompressedBuffer::Compress(SharedBuffer::MakeView(&KeyData.IntValues[ValueIndex], sizeof(KeyData.IntValues[ValueIndex])));
			KeyData.ReceivedChunk[ValueIndex] = false;
		}

		UserData& KeyUserData = KeyData.KeyUserData.Set(&KeyData, -1);
		for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
		{
			KeyData.ValueUserData[ValueIndex].Set(&KeyData, ValueIndex);
		}
		if (!KeyData.UseValueAPI)
		{
			CbObjectWriter Builder;
			Builder.BeginObject("key"sv);
			Builder << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
			Builder.EndObject();
			Builder.BeginArray("Values"sv);
			for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
			{
				Builder.BeginObject();
				Builder.AddObjectId("Id"sv, ValueIds[ValueIndex]);
				Builder.AddBinaryAttachment("RawHash"sv, KeyData.BufferValues[ValueIndex].DecodeRawHash());
				Builder.AddInteger("RawSize"sv, KeyData.BufferValues[ValueIndex].DecodeRawSize());
				Builder.EndObject();
			}
			Builder.EndArray();

			CacheRecordPolicy PutRecordPolicy;
			CacheRecordPolicy GetRecordPolicy;
			if (!KeyData.UseValuePolicy)
			{
				PutRecordPolicy = CacheRecordPolicy(PutPolicy);
				GetRecordPolicy = CacheRecordPolicy(GetPolicy);
			}
			else
			{
				// Switch the SkipData field in the Record policy so that if the CacheStore ignores the ValuePolicies
				// it will use the wrong value for SkipData and fail our tests.
				CacheRecordPolicyBuilder PutBuilder(PutPolicy ^ CachePolicy::SkipData);
				CacheRecordPolicyBuilder GetBuilder(GetPolicy ^ CachePolicy::SkipData);
				for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
				{
					PutBuilder.AddValuePolicy(ValueIds[ValueIndex], PutPolicy);
					GetBuilder.AddValuePolicy(ValueIds[ValueIndex], GetPolicy);
				}
				PutRecordPolicy = PutBuilder.Build();
				GetRecordPolicy = GetBuilder.Build();
			}
			if (!KeyData.ForceMiss)
			{
				PutRequests.push_back({Key, Builder.Save(), PutRecordPolicy, &KeyData, &KeyUserData});
			}
			GetRequests.push_back({Key, GetRecordPolicy, &KeyUserData});
			for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
			{
				UserData& ValueUserData = KeyData.ValueUserData[ValueIndex];
				ChunkRequests.push_back({Key, ValueIds[ValueIndex], 0, UINT64_MAX, IoHash(), GetPolicy, &ValueUserData});
			}
		}
		else
		{
			if (!KeyData.ForceMiss)
			{
				PutValueRequests.push_back({Key, KeyData.BufferValues[0], PutPolicy, &KeyUserData});
			}
			GetValueRequests.push_back({Key, GetPolicy, &KeyUserData});
			ChunkRequests.push_back({Key, Oid::Zero, 0, UINT64_MAX, IoHash(), GetPolicy, &KeyUserData});
		}
	}

	// PutCacheRecords
	{
		CachePolicy							  BatchDefaultPolicy = CachePolicy::Default;
		cacherequests::PutCacheRecordsRequest Request			 = {.AcceptMagic   = kCbPkgMagic,
																	.DefaultPolicy = BatchDefaultPolicy,
																	.Namespace	   = std::string(TestNamespace)};
		Request.Requests.reserve(PutRequests.size());
		for (CachePutRequest& PutRequest : PutRequests)
		{
			cacherequests::PutCacheRecordRequest& RecordRequest = Request.Requests.emplace_back();
			RecordRequest.Key									= PutRequest.Key;
			RecordRequest.Policy								= PutRequest.Policy;
			RecordRequest.Values.reserve(NumValues);
			for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
			{
				RecordRequest.Values.push_back({.Id = ValueIds[ValueIndex], .Body = PutRequest.Values->BufferValues[ValueIndex]});
			}
			PutRequest.Data->Data->ReceivedPut = true;
		}

		CbPackage Package;
		CHECK(Request.Format(Package));
		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
		CHECK_MESSAGE(Result.status_code == 200, "PutCacheRecords unexpectedly failed.");
	}

	// PutCacheValues
	{
		CachePolicy BatchDefaultPolicy = CachePolicy::Default;

		cacherequests::PutCacheValuesRequest Request = {.AcceptMagic   = kCbPkgMagic,
														.DefaultPolicy = BatchDefaultPolicy,
														.Namespace	   = std::string(TestNamespace)};
		Request.Requests.reserve(PutValueRequests.size());
		for (CachePutValueRequest& PutRequest : PutValueRequests)
		{
			Request.Requests.push_back({.Key = PutRequest.Key, .Body = PutRequest.Value, .Policy = PutRequest.Policy});
			PutRequest.Data->Data->ReceivedPutValue = true;
		}

		CbPackage Package;
		CHECK(Request.Format(Package));

		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
		CHECK_MESSAGE(Result.status_code == 200, "PutCacheValues unexpectedly failed.");
	}

	for (KeyData& KeyData : KeyDatas)
	{
		if (!KeyData.ForceMiss)
		{
			if (!KeyData.UseValueAPI)
			{
				CHECK_MESSAGE(KeyData.ReceivedPut, WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put.").c_str());
			}
			else
			{
				CHECK_MESSAGE(KeyData.ReceivedPutValue,
							  WriteToString<32>("Key ", KeyData.KeyIndex, " was unexpectedly not put to ValueAPI.").c_str());
			}
		}
	}

	// GetCacheRecords
	{
		CachePolicy							  BatchDefaultPolicy = CachePolicy::Default;
		cacherequests::GetCacheRecordsRequest Request			 = {.AcceptMagic   = kCbPkgMagic,
																	.DefaultPolicy = BatchDefaultPolicy,
																	.Namespace	   = std::string(TestNamespace)};
		Request.Requests.reserve(GetRequests.size());
		for (CacheGetRequest& GetRequest : GetRequests)
		{
			Request.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
		}

		CbPackage Package;
		CHECK(Request.Format(Package));
		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
		CHECK_MESSAGE(Result.status_code == 200, "GetCacheRecords unexpectedly failed.");
		CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
		bool	  Loaded   = !Response.IsNull();
		CHECK_MESSAGE(Loaded, "GetCacheRecords response failed to load.");
		cacherequests::GetCacheRecordsResult RequestResult;
		CHECK(RequestResult.Parse(Response));
		CHECK_MESSAGE(RequestResult.Results.size() == GetRequests.size(), "GetCacheRecords response count did not match request count.");
		for (int Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& RecordResult : RequestResult.Results)
		{
			bool			 Succeeded	= RecordResult.has_value();
			CacheGetRequest& GetRequest = GetRequests[Index++];
			KeyData*		 KeyData	= GetRequest.Data->Data;
			KeyData->ReceivedGet		= true;
			WriteToString<32> Name("Get(", KeyData->KeyIndex, ")");
			if (KeyData->ShouldBeHit)
			{
				CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
			}
			else if (KeyData->ForceMiss)
			{
				CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, " unexpectedly succeeded.").c_str());
			}
			if (!KeyData->ForceMiss && Succeeded)
			{
				CHECK_MESSAGE(RecordResult->Values.size() == NumValues,
							  WriteToString<32>(Name, " number of values did not match.").c_str());
				for (const cacherequests::GetCacheRecordResultValue& Value : RecordResult->Values)
				{
					int ExpectedValueIndex = 0;
					for (; ExpectedValueIndex < NumValues; ++ExpectedValueIndex)
					{
						if (ValueIds[ExpectedValueIndex] == Value.Id)
						{
							break;
						}
					}
					CHECK_MESSAGE(ExpectedValueIndex < NumValues, WriteToString<32>(Name, " could not find matching ValueId.").c_str());

					WriteToString<32> ValueName("Get(", KeyData->KeyIndex, ",", ExpectedValueIndex, ")");

					CompressedBuffer ExpectedValue = KeyData->BufferValues[ExpectedValueIndex];
					CHECK_MESSAGE(Value.RawHash == ExpectedValue.DecodeRawHash(),
								  WriteToString<32>(ValueName, " RawHash did not match.").c_str());
					CHECK_MESSAGE(Value.RawSize == ExpectedValue.DecodeRawSize(),
								  WriteToString<32>(ValueName, " RawSize did not match.").c_str());

					if (KeyData->GetRequestsData)
					{
						SharedBuffer Buffer = Value.Body.Decompress();
						CHECK_MESSAGE(Buffer.GetSize() == Value.RawSize,
									  WriteToString<32>(ValueName, " BufferSize did not match RawSize.").c_str());
						uint64_t ActualIntValue	  = ((const uint64_t*)Buffer.GetData())[0];
						uint64_t ExpectedIntValue = KeyData->IntValues[ExpectedValueIndex];
						CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(ValueName, " had unexpected data.").c_str());
					}
				}
			}
		}
	}

	// GetCacheValues
	{
		CachePolicy BatchDefaultPolicy = CachePolicy::Default;

		cacherequests::GetCacheValuesRequest GetCacheValuesRequest = {.AcceptMagic	 = kCbPkgMagic,
																	  .DefaultPolicy = BatchDefaultPolicy,
																	  .Namespace	 = std::string(TestNamespace)};
		GetCacheValuesRequest.Requests.reserve(GetValueRequests.size());
		for (CacheGetValueRequest& GetRequest : GetValueRequests)
		{
			GetCacheValuesRequest.Requests.push_back({.Key = GetRequest.Key, .Policy = GetRequest.Policy});
		}

		CbPackage Package;
		CHECK(GetCacheValuesRequest.Format(Package));

		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
		CHECK_MESSAGE(Result.status_code == 200, "GetCacheValues unexpectedly failed.");
		IoBuffer  MessageBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size());
		CbPackage Response = ParsePackageMessage(MessageBuffer);
		bool	  Loaded   = !Response.IsNull();
		CHECK_MESSAGE(Loaded, "GetCacheValues response failed to load.");
		cacherequests::GetCacheValuesResult GetCacheValuesResult;
		CHECK(GetCacheValuesResult.Parse(Response));
		for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheValuesResult.Results)
		{
			bool				  Succeeded = ValueResult.RawHash != IoHash::Zero;
			CacheGetValueRequest& Request	= GetValueRequests[Index++];
			KeyData*			  KeyData	= Request.Data->Data;
			KeyData->ReceivedGetValue		= true;
			WriteToString<32> Name("GetValue("sv, KeyData->KeyIndex, ")"sv);

			if (KeyData->ShouldBeHit)
			{
				CHECK_MESSAGE(Succeeded, WriteToString<32>(Name, " unexpectedly failed.").c_str());
			}
			else if (KeyData->ForceMiss)
			{
				CHECK_MESSAGE(!Succeeded, WriteToString<32>(Name, "unexpectedly succeeded.").c_str());
			}
			if (!KeyData->ForceMiss && Succeeded)
			{
				CompressedBuffer ExpectedValue = KeyData->BufferValues[0];
				CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
							  WriteToString<32>(Name, " RawHash did not match.").c_str());
				CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
							  WriteToString<32>(Name, " RawSize did not match.").c_str());

				if (KeyData->GetRequestsData)
				{
					SharedBuffer Buffer = ValueResult.Body.Decompress();
					CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
								  WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
					uint64_t ActualIntValue	  = ((const uint64_t*)Buffer.GetData())[0];
					uint64_t ExpectedIntValue = KeyData->IntValues[0];
					CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
				}
			}
		}
	}

	// GetCacheChunks
	{
		std::sort(ChunkRequests.begin(), ChunkRequests.end(), [](CacheGetChunkRequest& A, CacheGetChunkRequest& B) {
			return A.Key.Hash < B.Key.Hash;
		});
		CachePolicy							 BatchDefaultPolicy	   = CachePolicy::Default;
		cacherequests::GetCacheChunksRequest GetCacheChunksRequest = {.AcceptMagic	 = kCbPkgMagic,
																	  .DefaultPolicy = BatchDefaultPolicy,
																	  .Namespace	 = std::string(TestNamespace)};
		GetCacheChunksRequest.Requests.reserve(ChunkRequests.size());
		for (CacheGetChunkRequest& ChunkRequest : ChunkRequests)
		{
			GetCacheChunksRequest.Requests.push_back({.Key		 = ChunkRequest.Key,
													  .ValueId	 = ChunkRequest.ValueId,
													  .ChunkId	 = IoHash(),
													  .RawOffset = ChunkRequest.RawOffset,
													  .RawSize	 = ChunkRequest.RawSize,
													  .Policy	 = ChunkRequest.Policy});
		}
		CbPackage Package;
		CHECK(GetCacheChunksRequest.Format(Package));

		IoBuffer	  Body	 = FormatPackageMessageBuffer(Package).Flatten().AsIoBuffer();
		cpr::Response Result = cpr::Post(cpr::Url{fmt::format("{}/$rpc", BaseUri)},
										 cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}},
										 cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
		CHECK_MESSAGE(Result.status_code == 200, "GetCacheChunks unexpectedly failed.");
		CbPackage Response = ParsePackageMessage(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()));
		bool	  Loaded   = !Response.IsNull();
		CHECK_MESSAGE(Loaded, "GetCacheChunks response failed to load.");
		cacherequests::GetCacheChunksResult GetCacheChunksResult;
		CHECK(GetCacheChunksResult.Parse(Response));
		CHECK_MESSAGE(GetCacheChunksResult.Results.size() == ChunkRequests.size(),
					  "GetCacheChunks response count did not match request count.");

		for (int Index = 0; const cacherequests::CacheValueResult& ValueResult : GetCacheChunksResult.Results)
		{
			bool Succeeded = ValueResult.RawHash != IoHash::Zero;

			CacheGetChunkRequest& Request	   = ChunkRequests[Index++];
			KeyData*			  KeyData	   = Request.Data->Data;
			int					  ValueIndex   = Request.Data->ValueIndex >= 0 ? Request.Data->ValueIndex : 0;
			KeyData->ReceivedChunk[ValueIndex] = true;
			WriteToString<32> Name("GetChunks("sv, KeyData->KeyIndex, ","sv, ValueIndex, ")"sv);

			if (KeyData->ShouldBeHit)
			{
				CHECK_MESSAGE(Succeeded, WriteToString<256>(Name, " unexpectedly failed."sv).c_str());
			}
			else if (KeyData->ForceMiss)
			{
				CHECK_MESSAGE(!Succeeded, WriteToString<256>(Name, " unexpectedly succeeded."sv).c_str());
			}
			if (KeyData->ShouldBeHit && Succeeded)
			{
				CompressedBuffer ExpectedValue = KeyData->BufferValues[ValueIndex];
				CHECK_MESSAGE(ValueResult.RawHash == ExpectedValue.DecodeRawHash(),
							  WriteToString<32>(Name, " had unexpected RawHash.").c_str());
				CHECK_MESSAGE(ValueResult.RawSize == ExpectedValue.DecodeRawSize(),
							  WriteToString<32>(Name, " had unexpected RawSize.").c_str());

				if (KeyData->GetRequestsData)
				{
					SharedBuffer Buffer = ValueResult.Body.Decompress();
					CHECK_MESSAGE(Buffer.GetSize() == ValueResult.RawSize,
								  WriteToString<32>(Name, " BufferSize did not match RawSize.").c_str());
					uint64_t ActualIntValue	  = ((const uint64_t*)Buffer.GetData())[0];
					uint64_t ExpectedIntValue = KeyData->IntValues[ValueIndex];
					CHECK_MESSAGE(ActualIntValue == ExpectedIntValue, WriteToString<32>(Name, " had unexpected data.").c_str());
				}
			}
		}
	}

	for (KeyData& KeyData : KeyDatas)
	{
		if (!KeyData.UseValueAPI)
		{
			CHECK_MESSAGE(KeyData.ReceivedGet, WriteToString<32>("Get(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
			for (int ValueIndex = 0; ValueIndex < NumValues; ++ValueIndex)
			{
				CHECK_MESSAGE(
					KeyData.ReceivedChunk[ValueIndex],
					WriteToString<32>("GetChunks(", KeyData.KeyIndex, ",", ValueIndex, ") was unexpectedly not received.").c_str());
			}
		}
		else
		{
			CHECK_MESSAGE(KeyData.ReceivedGetValue,
						  WriteToString<32>("GetValue(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
			CHECK_MESSAGE(KeyData.ReceivedChunk[0],
						  WriteToString<32>("GetChunks(", KeyData.KeyIndex, ") was unexpectedly not received.").c_str());
		}
	}
}

class ZenServerTestHelper
{
public:
	ZenServerTestHelper(std::string_view HelperId, int ServerCount) : m_HelperId{HelperId}, m_ServerCount{ServerCount} {}
	~ZenServerTestHelper() {}

	void SpawnServers(std::string_view AdditionalServerArgs = std::string_view())
	{
		SpawnServers([](ZenServerInstance&) {}, AdditionalServerArgs);
	}

	void SpawnServers(auto&& Callback, std::string_view AdditionalServerArgs)
	{
		ZEN_INFO("{}: spawning {} server instances", m_HelperId, m_ServerCount);

		m_Instances.resize(m_ServerCount);

		for (int i = 0; i < m_ServerCount; ++i)
		{
			auto& Instance = m_Instances[i];
			Instance	   = std::make_unique<ZenServerInstance>(TestEnv);
			Instance->SetTestDir(TestEnv.CreateNewTestDir());
		}

		for (int i = 0; i < m_ServerCount; ++i)
		{
			auto& Instance = m_Instances[i];
			Callback(*Instance);
		}

		for (int i = 0; i < m_ServerCount; ++i)
		{
			auto& Instance = m_Instances[i];
			Instance->SpawnServer(TestEnv.GetNewPortNumber(), AdditionalServerArgs);
		}

		for (int i = 0; i < m_ServerCount; ++i)
		{
			auto&	 Instance	= m_Instances[i];
			uint16_t PortNumber = Instance->WaitUntilReady();
			CHECK_MESSAGE(PortNumber != 0, Instance->GetLogOutput());
		}
	}

	ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; }

private:
	std::string										m_HelperId;
	int												m_ServerCount = 0;
	std::vector<std::unique_ptr<ZenServerInstance>> m_Instances;
};

TEST_CASE("http.basics")
{
	using namespace std::literals;

	ZenServerTestHelper Servers{"http.basics"sv, 1};
	Servers.SpawnServers();

	ZenServerInstance& Instance = Servers.GetInstance(0);
	const std::string  BaseUri	= Instance.GetBaseUri();

	{
		cpr::Response r = cpr::Get(cpr::Url{fmt::format("{}/testing/hello", BaseUri)});
		CHECK(IsHttpSuccessCode(r.status_code));
	}

	{
		cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/hello", BaseUri)});
		CHECK_EQ(r.status_code, 404);
	}

	{
		cpr::Response r = cpr::Post(cpr::Url{fmt::format("{}/testing/echo", BaseUri)}, cpr::Body{"yoyoyoyo"});
		CHECK_EQ(r.status_code, 200);
		CHECK_EQ(r.text, "yoyoyoyo");
	}
}

TEST_CASE("http.package")
{
	using namespace std::literals;

	ZenServerTestHelper Servers{"http.package"sv, 1};
	Servers.SpawnServers();

	ZenServerInstance& Instance = Servers.GetInstance(0);
	const std::string  BaseUri	= Instance.GetBaseUri();

	static const uint8_t Data1[] = {0, 1, 2, 3};
	static const uint8_t Data2[] = {0, 1, 2, 3, 4, 5, 6, 7, 8};

	zen::CompressedBuffer AttachmentData1 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data1, 4}),
																			zen::OodleCompressor::NotSet,
																			zen::OodleCompressionLevel::None);
	zen::CbAttachment	  Attach1{AttachmentData1, AttachmentData1.DecodeRawHash()};
	zen::CompressedBuffer AttachmentData2 = zen::CompressedBuffer::Compress(zen::SharedBuffer::Clone({Data2, 8}),
																			zen::OodleCompressor::NotSet,
																			zen::OodleCompressionLevel::None);
	zen::CbAttachment	  Attach2{AttachmentData2, AttachmentData2.DecodeRawHash()};

	zen::CbObjectWriter Writer;

	Writer.AddAttachment("attach1", Attach1);
	Writer.AddAttachment("attach2", Attach2);

	zen::CbObject CoreObject = Writer.Save();

	zen::CbPackage TestPackage;
	TestPackage.SetObject(CoreObject);
	TestPackage.AddAttachment(Attach1);
	TestPackage.AddAttachment(Attach2);

	zen::HttpClient			  TestClient(BaseUri);
	zen::HttpClient::Response Response = TestClient.TransactPackage("/testing/package"sv, TestPackage);

	zen::CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);

	CHECK_EQ(ResponsePackage, TestPackage);
}

std::string
OidAsString(const Oid& Id)
{
	StringBuilder<25> OidStringBuilder;
	Id.ToString(OidStringBuilder);
	return OidStringBuilder.ToString();
}

CbPackage
CreateOplogPackage(const Oid& Id, const std::span<const std::pair<Oid, CompressedBuffer>>& Attachments)
{
	CbPackage	   Package;
	CbObjectWriter Object;
	Object << "key"sv << OidAsString(Id);
	if (!Attachments.empty())
	{
		Object.BeginArray("bulkdata");
		for (const auto& Attachment : Attachments)
		{
			CbAttachment Attach(Attachment.second, Attachment.second.DecodeRawHash());
			Object.BeginObject();
			Object << "id"sv << Attachment.first;
			Object << "type"sv
				   << "Standard"sv;
			Object << "data"sv << Attach;
			Object.EndObject();

			Package.AddAttachment(Attach);
			ZEN_DEBUG("Added attachment {}", Attach.GetHash());
		}
		Object.EndArray();
	}
	Package.SetObject(Object.Save());
	return Package;
};

cpr::Body
AsBody(const IoBuffer& Payload)
{
	return cpr::Body{(const char*)Payload.GetData(), Payload.Size()};
};

enum CbWriterMeta
{
	BeginObject,
	EndObject,
	BeginArray,
	EndArray
};

inline CbWriter&
operator<<(CbWriter& Writer, CbWriterMeta Meta)
{
	switch (Meta)
	{
		case BeginObject:
			Writer.BeginObject();
			break;
		case EndObject:
			Writer.EndObject();
			break;
		case BeginArray:
			Writer.BeginArray();
			break;
		case EndArray:
			Writer.EndArray();
			break;
		default:
			ZEN_ASSERT(false);
	}
	return Writer;
}

TEST_CASE("project.remote")
{
	using namespace std::literals;
	using namespace utils;

	ZenServerTestHelper Servers("remote", 3);
	Servers.SpawnServers("--debug");

	std::vector<Oid> OpIds;
	const size_t	 OpCount = 24;
	OpIds.reserve(OpCount);
	for (size_t I = 0; I < OpCount; ++I)
	{
		OpIds.emplace_back(Oid::NewOid());
	}

	std::unordered_map<Oid, std::vector<std::pair<Oid, CompressedBuffer>>, Oid::Hasher> Attachments;
	{
		std::vector<std::size_t> AttachmentSizes(
			{7633,	6825,  5738, 8031,	 7225, 566,	 3656, 6006,  24,	33466, 1093, 4269,	 2257,	3685, 13489, 97194,
			 6151,	5482,  6217, 3511,	 6738, 5061, 7537, 2759,  1916, 8210,  2235, 224024, 51582, 5251, 491,	 2u * 1024u * 1024u + 124u,
			 74607, 18135, 3767, 154045, 4415, 5007, 8876, 96761, 3359, 8526,  4097, 4855,	 48225});
		auto It				   = AttachmentSizes.begin();
		Attachments[OpIds[0]]  = {};
		Attachments[OpIds[1]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		Attachments[OpIds[2]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
		Attachments[OpIds[3]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		Attachments[OpIds[4]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
		Attachments[OpIds[5]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
		Attachments[OpIds[6]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		Attachments[OpIds[7]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
		Attachments[OpIds[8]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
		Attachments[OpIds[9]]  = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
		Attachments[OpIds[10]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		Attachments[OpIds[11]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
		Attachments[OpIds[12]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++, *It++});
		Attachments[OpIds[13]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		Attachments[OpIds[14]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
		Attachments[OpIds[15]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
		Attachments[OpIds[16]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
		Attachments[OpIds[17]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
		Attachments[OpIds[18]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++});
		Attachments[OpIds[19]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{});
		Attachments[OpIds[20]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		Attachments[OpIds[21]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		Attachments[OpIds[22]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++, *It++, *It++});
		Attachments[OpIds[23]] = CreateSemiRandomAttachments(std::initializer_list<size_t>{*It++});
		ZEN_ASSERT(It == AttachmentSizes.end());
	}

	auto AddOp = [](const CbObject& Op, std::unordered_map<Oid, uint32_t, Oid::Hasher>& Ops) {
		XXH3_128Stream KeyHasher;
		Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
		XXH3_128 KeyHash = KeyHasher.GetHash();
		Oid		 Id;
		memcpy(Id.OidBits, &KeyHash, sizeof Id.OidBits);
		IoBuffer	   Buffer	  = Op.GetBuffer().AsIoBuffer();
		const uint32_t OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), Buffer.GetSize()) & 0xffffFFFF);
		Ops.insert({Id, OpCoreHash});
	};

	auto MakeProject = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName) {
		CbObjectWriter Project;
		Project.AddString("id"sv, ProjectName);
		Project.AddString("root"sv, ""sv);
		Project.AddString("engine"sv, ""sv);
		Project.AddString("project"sv, ""sv);
		Project.AddString("projectfile"sv, ""sv);
		IoBuffer	ProjectPayload = Project.Save().GetBuffer().AsIoBuffer();
		std::string ProjectRequest = fmt::format("{}/prj/{}", UrlBase, ProjectName);
		Session.SetUrl({ProjectRequest});
		Session.SetBody(cpr::Body{(const char*)ProjectPayload.GetData(), ProjectPayload.GetSize()});
		cpr::Response Response = Session.Post();
		CHECK(IsHttpSuccessCode(Response.status_code));
	};

	auto MakeOplog = [](cpr::Session& Session, std::string_view UrlBase, std::string_view ProjectName, std::string_view OplogName) {
		std::string CreateOplogRequest = fmt::format("{}/prj/{}/oplog/{}", UrlBase, ProjectName, OplogName);
		Session.SetUrl({CreateOplogRequest});
		Session.SetBody(cpr::Body{});
		cpr::Response Response = Session.Post();
		CHECK(IsHttpSuccessCode(Response.status_code));
	};

	auto MakeOp = [](cpr::Session&	  Session,
					 std::string_view UrlBase,
					 std::string_view ProjectName,
					 std::string_view OplogName,
					 const CbPackage& OpPackage) {
		std::string CreateOpRequest = fmt::format("{}/prj/{}/oplog/{}/new", UrlBase, ProjectName, OplogName);
		Session.SetUrl({CreateOpRequest});
		zen::BinaryWriter MemOut;
		legacy::SaveCbPackage(OpPackage, MemOut);
		Session.SetBody(cpr::Body{(const char*)MemOut.Data(), MemOut.Size()});
		cpr::Response Response = Session.Post();
		CHECK(IsHttpSuccessCode(Response.status_code));
	};

	cpr::Session Session;
	MakeProject(Session, Servers.GetInstance(0).GetBaseUri(), "proj0");
	MakeOplog(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");

	std::unordered_map<Oid, uint32_t, Oid::Hasher> SourceOps;
	for (const Oid& OpId : OpIds)
	{
		CbPackage OpPackage = CreateOplogPackage(OpId, Attachments[OpId]);
		CHECK(OpPackage.GetAttachments().size() == Attachments[OpId].size());
		AddOp(OpPackage.GetObject(), SourceOps);
		MakeOp(Session, Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0", OpPackage);
	}

	std::vector<IoHash> AttachmentHashes;
	AttachmentHashes.reserve(Attachments.size());
	for (const auto& AttachmentOplog : Attachments)
	{
		for (const auto& Attachment : AttachmentOplog.second)
		{
			AttachmentHashes.emplace_back(Attachment.second.DecodeRawHash());
		}
	}

	auto MakeCbObjectPayload = [](std::function<void(CbObjectWriter & Writer)> Write) -> IoBuffer {
		CbObjectWriter Writer;
		Write(Writer);
		IoBuffer Result = Writer.Save().GetBuffer().AsIoBuffer();
		Result.MakeOwned();
		return Result;
	};

	auto ValidateAttachments = [&MakeCbObjectPayload, &AttachmentHashes, &Servers, &Session](int			  ServerIndex,
																							 std::string_view Project,
																							 std::string_view Oplog) {
		std::string GetChunksRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
		Session.SetUrl({GetChunksRequest});
		IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes](CbObjectWriter& Writer) {
			Writer << "method"sv
				   << "getchunks"sv;
			Writer << "chunks"sv << BeginArray;
			for (const IoHash& Chunk : AttachmentHashes)
			{
				Writer << Chunk;
			}
			Writer << EndArray;	 // chunks
		});
		Session.SetBody(AsBody(Payload));
		Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
		cpr::Response Response = Session.Post();
		CHECK(IsHttpSuccessCode(Response.status_code));
		CbPackage ResponsePackage = ParsePackageMessage(IoBuffer(IoBuffer::Wrap, Response.text.data(), Response.text.size()));
		CHECK(ResponsePackage.GetAttachments().size() == AttachmentHashes.size());
		for (auto A : ResponsePackage.GetAttachments())
		{
			CHECK(IoHash::HashBuffer(A.AsCompressedBinary().DecompressToComposite()) == A.GetHash());
		}
	};

	auto ValidateOplog = [&SourceOps, &AddOp, &Servers, &Session](int ServerIndex, std::string_view Project, std::string_view Oplog) {
		std::unordered_map<Oid, uint32_t, Oid::Hasher> TargetOps;
		std::vector<CbObject>						   ResultingOplog;

		std::string GetOpsRequest =
			fmt::format("{}/prj/{}/oplog/{}/entries", Servers.GetInstance(ServerIndex).GetBaseUri(), Project, Oplog);
		Session.SetUrl({GetOpsRequest});
		cpr::Response Response = Session.Get();
		CHECK(IsHttpSuccessCode(Response.status_code));

		IoBuffer	Payload(IoBuffer::Wrap, Response.text.data(), Response.text.size());
		CbObject	OplogResonse = LoadCompactBinaryObject(Payload);
		CbArrayView EntriesArray = OplogResonse["entries"sv].AsArrayView();

		for (CbFieldView OpEntry : EntriesArray)
		{
			CbObjectView Core = OpEntry.AsObjectView();
			BinaryWriter Writer;
			Core.CopyTo(Writer);
			MemoryView OpView = Writer.GetView();
			IoBuffer   OpBuffer(IoBuffer::Wrap, OpView.GetData(), OpView.GetSize());
			CbObject   Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
			AddOp(Op, TargetOps);
		}
		CHECK(SourceOps == TargetOps);
	};

	auto WaitForCompletion = [&Session](ZenServerInstance& Server, const cpr::Response& Response) {
		CHECK(IsHttpSuccessCode(Response.status_code));
		uint64_t JobId = ParseInt<uint64_t>(Response.text).value_or(0);
		CHECK(JobId != 0);
		Session.SetUrl(fmt::format("{}/admin/jobs/{}", Server.GetBaseUri(), JobId));
		Session.SetHeader(cpr::Header{{"Accept", std::string(ToString(ZenContentType::kCbObject))}});
		while (true)
		{
			cpr::Response StatusResponse = Session.Get();
			CHECK(IsHttpSuccessCode(StatusResponse.status_code));
			CbObject ReponseObject =
				LoadCompactBinaryObject(IoBuffer(IoBuffer::Wrap, StatusResponse.text.data(), StatusResponse.text.size()));
			std::string_view Status = ReponseObject["Status"sv].AsString();
			CHECK(Status != "Aborted"sv);
			if (Status == "Complete"sv)
			{
				return;
			}
			Sleep(10);
		}
	};

	SUBCASE("File")
	{
		ScopedTemporaryDirectory TempDir;
		{
			std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
			Session.SetUrl({SaveOplogRequest});

			IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "export"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "maxblocksize"sv << 3072u;
					Writer << "maxchunkembedsize"sv << 1296u;
					Writer << "chunkfilesizelimit"sv << 5u * 1024u;
					Writer << "force"sv << false;
					Writer << "file"sv << BeginObject;
					{
						Writer << "path"sv << path;
						Writer << "name"sv
							   << "proj0_oplog0"sv;
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));
			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(0), Response);
		}
		{
			MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
			MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
			std::string LoadOplogRequest =
				fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
			Session.SetUrl({LoadOplogRequest});

			IoBuffer Payload = MakeCbObjectPayload([&AttachmentHashes, path = TempDir.Path().string()](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "import"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "force"sv << false;
					Writer << "file"sv << BeginObject;
					{
						Writer << "path"sv << path;
						Writer << "name"sv
							   << "proj0_oplog0"sv;
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));

			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(1), Response);
		}
		ValidateAttachments(1, "proj0_copy", "oplog0_copy");
		ValidateOplog(1, "proj0_copy", "oplog0_copy");
	}

	SUBCASE("File disable blocks")
	{
		ScopedTemporaryDirectory TempDir;
		{
			std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
			Session.SetUrl({SaveOplogRequest});

			IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "export"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "maxblocksize"sv << 3072u;
					Writer << "maxchunkembedsize"sv << 1296u;
					Writer << "chunkfilesizelimit"sv << 5u * 1024u;
					Writer << "force"sv << false;
					Writer << "file"sv << BeginObject;
					{
						Writer << "path"sv << TempDir.Path().string();
						Writer << "name"sv
							   << "proj0_oplog0"sv;
						Writer << "disableblocks"sv << true;
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));
			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(0), Response);
		}
		{
			MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
			MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
			std::string LoadOplogRequest =
				fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
			Session.SetUrl({LoadOplogRequest});
			IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "import"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "force"sv << false;
					Writer << "file"sv << BeginObject;
					{
						Writer << "path"sv << TempDir.Path().string();
						Writer << "name"sv
							   << "proj0_oplog0"sv;
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));
			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(1), Response);
		}
		ValidateAttachments(1, "proj0_copy", "oplog0_copy");
		ValidateOplog(1, "proj0_copy", "oplog0_copy");
	}

	SUBCASE("File force temp blocks")
	{
		ScopedTemporaryDirectory TempDir;
		{
			std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(0).GetBaseUri(), "proj0", "oplog0");
			Session.SetUrl({SaveOplogRequest});
			IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "export"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "maxblocksize"sv << 3072u;
					Writer << "maxchunkembedsize"sv << 1296u;
					Writer << "chunkfilesizelimit"sv << 5u * 1024u;
					Writer << "force"sv << false;
					Writer << "file"sv << BeginObject;
					{
						Writer << "path"sv << TempDir.Path().string();
						Writer << "name"sv
							   << "proj0_oplog0"sv;
						Writer << "enabletempblocks"sv << true;
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));
			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(0), Response);
		}
		{
			MakeProject(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy");
			MakeOplog(Session, Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
			std::string LoadOplogRequest =
				fmt::format("{}/prj/{}/oplog/{}/rpc", Servers.GetInstance(1).GetBaseUri(), "proj0_copy", "oplog0_copy");
			Session.SetUrl({LoadOplogRequest});
			IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "import"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "force"sv << false;
					Writer << "file"sv << BeginObject;
					{
						Writer << "path"sv << TempDir.Path().string();
						Writer << "name"sv
							   << "proj0_oplog0"sv;
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));
			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(1), Response);
		}
		ValidateAttachments(1, "proj0_copy", "oplog0_copy");
		ValidateOplog(1, "proj0_copy", "oplog0_copy");
	}

	SUBCASE("Zen")
	{
		ScopedTemporaryDirectory TempDir;
		{
			std::string ExportSourceUri = Servers.GetInstance(0).GetBaseUri();
			std::string ExportTargetUri = Servers.GetInstance(1).GetBaseUri();
			MakeProject(Session, ExportTargetUri, "proj0_copy");
			MakeOplog(Session, ExportTargetUri, "proj0_copy", "oplog0_copy");

			std::string SaveOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ExportSourceUri, "proj0", "oplog0");
			Session.SetUrl({SaveOplogRequest});

			IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "export"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "maxblocksize"sv << 3072u;
					Writer << "maxchunkembedsize"sv << 1296u;
					Writer << "chunkfilesizelimit"sv << 5u * 1024u;
					Writer << "force"sv << false;
					Writer << "zen"sv << BeginObject;
					{
						Writer << "url"sv << ExportTargetUri.substr(7);
						Writer << "project"
							   << "proj0_copy";
						Writer << "oplog"
							   << "oplog0_copy";
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));
			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(0), Response);
		}
		ValidateAttachments(1, "proj0_copy", "oplog0_copy");
		ValidateOplog(1, "proj0_copy", "oplog0_copy");

		{
			std::string ImportSourceUri = Servers.GetInstance(1).GetBaseUri();
			std::string ImportTargetUri = Servers.GetInstance(2).GetBaseUri();
			MakeProject(Session, ImportTargetUri, "proj1");
			MakeOplog(Session, ImportTargetUri, "proj1", "oplog1");
			std::string LoadOplogRequest = fmt::format("{}/prj/{}/oplog/{}/rpc", ImportTargetUri, "proj1", "oplog1");
			Session.SetUrl({LoadOplogRequest});

			IoBuffer Payload = MakeCbObjectPayload([&](CbObjectWriter& Writer) {
				Writer << "method"sv
					   << "import"sv;
				Writer << "params" << BeginObject;
				{
					Writer << "force"sv << false;
					Writer << "zen"sv << BeginObject;
					{
						Writer << "url"sv << ImportSourceUri.substr(7);
						Writer << "project"
							   << "proj0_copy";
						Writer << "oplog"
							   << "oplog0_copy";
					}
					Writer << EndObject;  // "file"
				}
				Writer << EndObject;  // "params"
			});
			Session.SetBody(AsBody(Payload));
			Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}});
			cpr::Response Response = Session.Post();
			WaitForCompletion(Servers.GetInstance(2), Response);
		}
		ValidateAttachments(2, "proj1", "oplog1");
		ValidateOplog(2, "proj1", "oplog1");
	}
}

std::vector<std::pair<std::filesystem::path, IoBuffer>>
GenerateFolderContent(const std::filesystem::path& RootPath)
{
	CreateDirectories(RootPath);
	std::vector<std::pair<std::filesystem::path, IoBuffer>> Result;
	Result.push_back(std::make_pair(RootPath / "root_blob_1.bin", CreateRandomBlob(4122)));
	Result.push_back(std::make_pair(RootPath / "root_blob_2.bin", CreateRandomBlob(2122)));

	std::filesystem::path EmptyFolder(RootPath / "empty_folder");

	std::filesystem::path FirstFolder(RootPath / "first_folder");
	std::filesystem::create_directory(FirstFolder);
	Result.push_back(std::make_pair(FirstFolder / "first_folder_blob1.bin", CreateRandomBlob(22)));
	Result.push_back(std::make_pair(FirstFolder / "first_folder_blob2.bin", CreateRandomBlob(122)));

	std::filesystem::path SecondFolder(RootPath / "second_folder");
	std::filesystem::create_directory(SecondFolder);
	Result.push_back(std::make_pair(SecondFolder / "second_folder_blob1.bin", CreateRandomBlob(522)));
	Result.push_back(std::make_pair(SecondFolder / "second_folder_blob2.bin", CreateRandomBlob(122)));
	Result.push_back(std::make_pair(SecondFolder / "second_folder_blob3.bin", CreateRandomBlob(225)));

	std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second");
	std::filesystem::create_directory(SecondFolderChild);
	Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob1.bin", CreateRandomBlob(622)));

	for (const auto& It : Result)
	{
		WriteFile(It.first, It.second);
	}

	return Result;
}

std::vector<std::pair<std::filesystem::path, IoBuffer>>
GenerateFolderContent2(const std::filesystem::path& RootPath)
{
	std::vector<std::pair<std::filesystem::path, IoBuffer>> Result;
	Result.push_back(std::make_pair(RootPath / "root_blob_3.bin", CreateRandomBlob(312)));
	std::filesystem::path FirstFolder(RootPath / "first_folder");
	Result.push_back(std::make_pair(FirstFolder / "first_folder_blob3.bin", CreateRandomBlob(722)));
	std::filesystem::path SecondFolder(RootPath / "second_folder");
	std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second");
	Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob2.bin", CreateRandomBlob(962)));
	Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob3.bin", CreateRandomBlob(561)));

	for (const auto& It : Result)
	{
		WriteFile(It.first, It.second);
	}

	return Result;
}

TEST_CASE("workspaces.create")
{
	using namespace std::literals;

	std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir();

	std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
	ZenServerInstance	  Instance(TestEnv);
	Instance.SetTestDir(TestDir);
	const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath));
	CHECK(PortNumber != 0);

	ScopedTemporaryDirectory TempDir;
	std::filesystem::path	 Root1Path	= TempDir.Path() / "root1";
	std::filesystem::path	 Root2Path	= TempDir.Path() / "root2";
	std::filesystem::path	 Share1Path = "shared_1";
	std::filesystem::path	 Share2Path = "shared_2";
	CreateDirectories(Share1Path);
	CreateDirectories(Share2Path);

	Oid Root1Id = Oid::Zero;
	Oid Root2Id = Oid::NewOid();

	HttpClient Client(Instance.GetBaseUri());

	CHECK(Client.Put(fmt::format("/ws/{}", Root1Id)).StatusCode == HttpResponseCode::BadRequest);

	if (HttpClient::Response Root1Response =
			Client.Put(fmt::format("/ws/{}", Oid::Zero), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}});
		Root1Response.StatusCode == HttpResponseCode::Created)
	{
		Root1Id = Oid::TryFromHexString(Root1Response.AsText());
		CHECK(Root1Id != Oid::Zero);
	}
	else
	{
		CHECK(false);
	}
	if (HttpClient::Response Root1Response =
			Client.Put(fmt::format("/ws/{}", Oid::Zero), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}});
		Root1Response.StatusCode == HttpResponseCode::OK)
	{
		CHECK(Root1Id == Oid::TryFromHexString(Root1Response.AsText()));
	}
	else
	{
		CHECK(false);
	}
	if (HttpClient::Response Root1Response =
			Client.Put(fmt::format("/ws/{}", Root1Id), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}});
		Root1Response.StatusCode == HttpResponseCode::OK)
	{
		CHECK(Root1Id == Oid::TryFromHexString(Root1Response.AsText()));
	}
	else
	{
		CHECK(false);
	}
	CHECK(Client.Put(fmt::format("/ws/{}", Root1Id), HttpClient::KeyValueMap{{"root_path", Root2Path.string()}}).StatusCode ==
		  HttpResponseCode::Conflict);

	CHECK(
		Client.Put(fmt::format("/ws/{}/{}", Root1Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode ==
		HttpResponseCode::Created);

	CHECK(
		Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode ==
		HttpResponseCode::NotFound);

	if (HttpClient::Response Root2Response =
			Client.Put(fmt::format("/ws/{}", Root2Id), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}});
		Root2Response.StatusCode == HttpResponseCode::Created)
	{
		CHECK(Root2Id == Oid::TryFromHexString(Root2Response.AsText()));
	}
	else
	{
		CHECK(false);
	}

	CHECK(Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero)).StatusCode == HttpResponseCode::BadRequest);

	Oid Share2Id = Oid::Zero;
	if (HttpClient::Response Share2Response =
			Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}});
		Share2Response.StatusCode == HttpResponseCode::Created)
	{
		Share2Id = Oid::TryFromHexString(Share2Response.AsText());
		CHECK(Share2Id != Oid::Zero);
	}

	CHECK(
		Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode ==
		HttpResponseCode::OK);

	CHECK(
		Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode ==
		HttpResponseCode::OK);

	CHECK(
		Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share1Path.string()}}).StatusCode ==
		HttpResponseCode::Conflict);
}

TEST_CASE("workspaces.lifetimes")
{
	using namespace std::literals;

	std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir();

	Oid WorkspaceId = Oid::NewOid();
	Oid ShareId		= Oid::NewOid();

	{
		std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
		ZenServerInstance	  Instance(TestEnv);
		Instance.SetTestDir(TestDir);
		const uint16_t PortNumber =
			Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath));
		CHECK(PortNumber != 0);

		ScopedTemporaryDirectory TempDir;
		std::filesystem::path	 RootPath  = TempDir.Path();
		std::filesystem::path	 SharePath = RootPath / "shared_folder";
		CreateDirectories(SharePath);

		HttpClient Client(Instance.GetBaseUri());
		CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode ==
			  HttpResponseCode::Created);
		CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId);
		CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode ==
			  HttpResponseCode::OK);

		CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", "shared_folder"}})
				  .StatusCode == HttpResponseCode::Created);
		CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId);
		CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", "shared_folder"}})
				  .StatusCode == HttpResponseCode::OK);
	}

	// Restart

	{
		std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
		ZenServerInstance	  Instance(TestEnv);
		Instance.SetTestDir(TestDir);
		const uint16_t PortNumber =
			Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath));
		CHECK(PortNumber != 0);

		HttpClient Client(Instance.GetBaseUri());
		CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId);

		CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId);
	}

	// Wipe system config
	std::filesystem::remove_all(SystemRootPath);

	// Restart

	{
		std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
		ZenServerInstance	  Instance(TestEnv);
		Instance.SetTestDir(TestDir);
		const uint16_t PortNumber =
			Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath));
		CHECK(PortNumber != 0);

		HttpClient Client(Instance.GetBaseUri());
		CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound);
		CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).StatusCode == HttpResponseCode::NotFound);
	}
}

TEST_CASE("workspaces.share")
{
	ZenServerInstance Instance(TestEnv);

	const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--workspaces-enabled");
	CHECK(PortNumber != 0);

	ScopedTemporaryDirectory TempDir;
	std::filesystem::path	 RootPath  = TempDir.Path();
	std::filesystem::path	 SharePath = RootPath / "shared_folder";
	GenerateFolderContent(SharePath);

	HttpClient Client(Instance.GetBaseUri());

	Oid WorkspaceId = Oid::NewOid();
	CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode ==
		  HttpResponseCode::Created);
	CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId);

	Oid ShareId = Oid::NewOid();
	CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", "shared_folder"}}).StatusCode ==
		  HttpResponseCode::Created);
	CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId);

	CHECK(Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId)).AsObject()["files"sv].AsArrayView().Num() == 8);
	GenerateFolderContent2(SharePath);
	CHECK(Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId)).AsObject()["files"sv].AsArrayView().Num() == 8);
	HttpClient::Response FilesResponse =
		Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId),
				   {},
				   HttpClient::KeyValueMap{{"refresh", ToString(true)}, {"fieldnames", "id,clientpath,size"}});
	CHECK(FilesResponse);
	std::unordered_map<Oid, std::pair<std::filesystem::path, uint64_t>, Oid::Hasher> Files;
	{
		CbArrayView FilesArray = FilesResponse.AsObject()["files"sv].AsArrayView();
		CHECK(FilesArray.Num() == 12);
		for (CbFieldView Field : FilesArray)
		{
			CbObjectView FileObject = Field.AsObjectView();
			Oid			 ChunkId	= FileObject["id"sv].AsObjectId();
			CHECK(ChunkId != Oid::Zero);
			uint64_t			  Size		  = FileObject["size"sv].AsUInt64();
			std::u8string_view	  Path		  = FileObject["clientpath"sv].AsU8String();
			std::filesystem::path AbsFilePath = SharePath / Path;
			CHECK(std::filesystem::is_regular_file(AbsFilePath));
			CHECK(std::filesystem::file_size(AbsFilePath) == Size);
			Files.insert_or_assign(ChunkId, std::make_pair(AbsFilePath, Size));
		}
	}

	HttpClient::Response EntriesResponse =
		Client.Get(fmt::format("/ws/{}/{}/entries", WorkspaceId, ShareId), {}, HttpClient::KeyValueMap{{"fieldfilter", "id,clientpath"}});
	CHECK(EntriesResponse);
	{
		CbArrayView EntriesArray = EntriesResponse.AsObject()["entries"sv].AsArrayView();
		CHECK(EntriesArray.Num() == 1);
		for (CbFieldView EntryField : EntriesArray)
		{
			CbObjectView EntryObject = EntryField.AsObjectView();
			CbArrayView	 FilesArray	 = EntryObject["files"sv].AsArrayView();
			CHECK(FilesArray.Num() == 12);
			for (CbFieldView FileField : FilesArray)
			{
				CbObjectView FileObject = FileField.AsObjectView();
				Oid			 ChunkId	= FileObject["id"sv].AsObjectId();
				CHECK(ChunkId != Oid::Zero);
				std::u8string_view	  Path		  = FileObject["clientpath"sv].AsU8String();
				std::filesystem::path AbsFilePath = SharePath / Path;
				CHECK(std::filesystem::is_regular_file(AbsFilePath));
			}
		}
	}

	HttpClient::Response FileManifestResponse =
		Client.Get(fmt::format("/ws/{}/{}/entries", WorkspaceId, ShareId),
				   {},
				   HttpClient::KeyValueMap{{"opkey", "file_manifest"}, {"fieldfilter", "id,clientpath"}});
	CHECK(FileManifestResponse);
	{
		CbArrayView EntriesArray = FileManifestResponse.AsObject()["entry"sv].AsObjectView()["files"sv].AsArrayView();
		CHECK(EntriesArray.Num() == 12);
		for (CbFieldView Field : EntriesArray)
		{
			CbObjectView FileObject = Field.AsObjectView();
			Oid			 ChunkId	= FileObject["id"sv].AsObjectId();
			CHECK(ChunkId != Oid::Zero);
			std::u8string_view	  Path		  = FileObject["clientpath"sv].AsU8String();
			std::filesystem::path AbsFilePath = SharePath / Path;
			CHECK(std::filesystem::is_regular_file(AbsFilePath));
		}
	}

	for (auto It : Files)
	{
		const Oid&					 ChunkId = It.first;
		const std::filesystem::path& Path	 = It.second.first;
		const uint64_t				 Size	 = It.second.second;

		CHECK(Client.Get(fmt::format("/ws/{}/{}/{}/info", WorkspaceId, ShareId, ChunkId)).AsObject()["size"sv].AsUInt64() == Size);

		{
			IoBuffer Payload = Client.Get(fmt::format("/ws/{}/{}/{}", WorkspaceId, ShareId, ChunkId)).ResponsePayload;
			CHECK(Payload);
			CHECK(Payload.GetSize() == Size);
			IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path);
			CHECK(FileContent);
			CHECK(FileContent.GetView().EqualBytes(Payload.GetView()));
		}

		{
			IoBuffer Payload =
				Client
					.Get(fmt::format("/ws/{}/{}/{}", WorkspaceId, ShareId, ChunkId),
						 {},
						 HttpClient::KeyValueMap{{"offset", fmt::format("{}", Size / 4)}, {"size", fmt::format("{}", Size / 2)}})
					.ResponsePayload;
			CHECK(Payload);
			CHECK(Payload.GetSize() == Size / 2);
			IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path, Size / 4, Size / 2);
			CHECK(FileContent);
			CHECK(FileContent.GetView().EqualBytes(Payload.GetView()));
		}
	}

	{
		uint32_t					   CorrelationId = gsl::narrow<uint32_t>(Files.size());
		std::vector<RequestChunkEntry> BatchEntries;
		for (auto It : Files)
		{
			const Oid&	   ChunkId = It.first;
			const uint64_t Size	   = It.second.second;

			BatchEntries.push_back(
				RequestChunkEntry{.ChunkId = ChunkId, .CorrelationId = --CorrelationId, .Offset = Size / 4, .RequestBytes = Size / 2});
		}
		IoBuffer BatchResponse =
			Client.Post(fmt::format("/ws/{}/{}/batch", WorkspaceId, ShareId), BuildChunkBatchRequest(BatchEntries)).ResponsePayload;
		CHECK(BatchResponse);
		std::vector<IoBuffer> BatchResult = ParseChunkBatchResponse(BatchResponse);
		CHECK(BatchResult.size() == Files.size());
		for (const RequestChunkEntry& Request : BatchEntries)
		{
			IoBuffer					 Result = BatchResult[Request.CorrelationId];
			auto						 It		= Files.find(Request.ChunkId);
			const std::filesystem::path& Path	= It->second.first;
			CHECK(Result.GetSize() == Request.RequestBytes);
			IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path, Request.Offset, Request.RequestBytes);
			CHECK(FileContent);
			CHECK(FileContent.GetView().EqualBytes(Result.GetView()));
		}
	}

	CHECK(Client.Delete(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)));
	CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).StatusCode == HttpResponseCode::NotFound);
	CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)));

	CHECK(Client.Delete(fmt::format("/ws/{}", WorkspaceId)));
	CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound);
}

#	if 0
TEST_CASE("lifetime.owner")
{
	// This test is designed to verify that the hand-over of sponsor processes is handled
	// correctly for the case when a second or third process is launched on the same port
	//
	// Due to the nature of it, it cannot be

	const uint16_t PortNumber = 23456;

	ZenServerInstance	  Zen1(TestEnv);
	std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
	Zen1.SetTestDir(TestDir1);
	Zen1.SpawnServer(PortNumber);
	Zen1.WaitUntilReady();
	Zen1.Detach();

	ZenServerInstance	  Zen2(TestEnv);
	std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();
	Zen2.SetTestDir(TestDir2);
	Zen2.SpawnServer(PortNumber);
	Zen2.WaitUntilReady();
	Zen2.Detach();
}

TEST_CASE("lifetime.owner.2")
{
	// This test is designed to verify that the hand-over of sponsor processes is handled
	// correctly for the case when a second or third process is launched on the same port
	//
	// Due to the nature of it, it cannot be

	const uint16_t PortNumber = 13456;

	std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir();
	std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir();

	ZenServerInstance Zen1(TestEnv);
	Zen1.SetTestDir(TestDir1);
	Zen1.SpawnServer(PortNumber);
	Zen1.WaitUntilReady();

	ZenServerInstance Zen2(TestEnv);
	Zen2.SetTestDir(TestDir2);
	Zen2.SetOwnerPid(Zen1.GetPid());
	Zen2.SpawnServer(PortNumber + 1);
	Zen2.Detach();

	ZenServerInstance Zen3(TestEnv);
	Zen3.SetTestDir(TestDir2);
	Zen3.SetOwnerPid(Zen1.GetPid());
	Zen3.SpawnServer(PortNumber + 1);
	Zen3.Detach();

	ZenServerInstance Zen4(TestEnv);
	Zen4.SetTestDir(TestDir2);
	Zen4.SetOwnerPid(Zen1.GetPid());
	Zen4.SpawnServer(PortNumber + 1);
	Zen4.Detach();
}
#	endif

}  // namespace zen::tests
#else
int
main()
{
}
#endif
